Whamcloud - gitweb
b8ee00aab51fc195b0cc0126189060f65b5336ea
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_MDS_GROUP(lsm->lsm_object_gr);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         body->oa = *oinfo->oi_oa;
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
214                                   lustre_swab_ost_body);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
265 {
266         struct ptlrpc_request *req;
267         struct ost_body       *body;
268         int                    rc;
269         ENTRY;
270
271         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272         if (req == NULL)
273                 RETURN(-ENOMEM);
274
275         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
276         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
277         if (rc) {
278                 ptlrpc_request_free(req);
279                 RETURN(rc);
280         }
281
282         osc_pack_req_body(req, oinfo);
283
284         ptlrpc_request_set_replen(req);
285
286         rc = ptlrpc_queue_wait(req);
287         if (rc)
288                 GOTO(out, rc);
289
290         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
291         if (body == NULL)
292                 GOTO(out, rc = -EPROTO);
293
294         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
295         *oinfo->oi_oa = body->oa;
296
297         /* This should really be sent by the OST */
298         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
299         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
300
301         EXIT;
302  out:
303         ptlrpc_req_finished(req);
304         return rc;
305 }
306
307 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
308                        struct obd_trans_info *oti)
309 {
310         struct ptlrpc_request *req;
311         struct ost_body       *body;
312         int                    rc;
313         ENTRY;
314
315         LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
316                  CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
317                  "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
318                  oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         *oinfo->oi_oa = body->oa;
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_async_args *aa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         *aa->aa_oi->oi_oa = body->oa;
366 out:
367         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
368         RETURN(rc);
369 }
370
371 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
372                              struct obd_trans_info *oti,
373                              struct ptlrpc_request_set *rqset)
374 {
375         struct ptlrpc_request *req;
376         struct osc_async_args *aa;
377         int                    rc;
378         ENTRY;
379
380         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
381         if (req == NULL)
382                 RETURN(-ENOMEM);
383
384         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
385         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
386         if (rc) {
387                 ptlrpc_request_free(req);
388                 RETURN(rc);
389         }
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
396                 LASSERT(oti);
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398         }
399
400         /* do mds to ost setattr asynchronously */
401         if (!rqset) {
402                 /* Do not wait for response. */
403                 ptlrpcd_add_req(req, PSCOPE_OTHER);
404         } else {
405                 req->rq_interpret_reply =
406                         (ptlrpc_interpterer_t)osc_setattr_interpret;
407
408                 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
409                 aa = ptlrpc_req_async_args(req);
410                 aa->aa_oi = oinfo;
411
412                 ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 int osc_real_create(struct obd_export *exp, struct obdo *oa,
419                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
420 {
421         struct ptlrpc_request *req;
422         struct ost_body       *body;
423         struct lov_stripe_md  *lsm;
424         int                    rc;
425         ENTRY;
426
427         LASSERT(oa);
428         LASSERT(ea);
429
430         lsm = *ea;
431         if (!lsm) {
432                 rc = obd_alloc_memmd(exp, &lsm);
433                 if (rc < 0)
434                         RETURN(rc);
435         }
436
437         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
438         if (req == NULL)
439                 GOTO(out, rc = -ENOMEM);
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 GOTO(out, rc);
445         }
446
447         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
448         LASSERT(body);
449         body->oa = *oa;
450
451         ptlrpc_request_set_replen(req);
452
453         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
454             oa->o_flags == OBD_FL_DELORPHAN) {
455                 DEBUG_REQ(D_HA, req,
456                           "delorphan from OST integration");
457                 /* Don't resend the delorphan req */
458                 req->rq_no_resend = req->rq_no_delay = 1;
459         }
460
461         rc = ptlrpc_queue_wait(req);
462         if (rc)
463                 GOTO(out_req, rc);
464
465         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         if (body == NULL)
467                 GOTO(out_req, rc = -EPROTO);
468
469         *oa = body->oa;
470
471         /* This should really be sent by the OST */
472         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
473         oa->o_valid |= OBD_MD_FLBLKSZ;
474
475         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
476          * have valid lsm_oinfo data structs, so don't go touching that.
477          * This needs to be fixed in a big way.
478          */
479         lsm->lsm_object_id = oa->o_id;
480         lsm->lsm_object_gr = oa->o_gr;
481         *ea = lsm;
482
483         if (oti != NULL) {
484                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
485
486                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
487                         if (!oti->oti_logcookies)
488                                 oti_alloc_cookies(oti, 1);
489                         *oti->oti_logcookies = oa->o_lcookie;
490                 }
491         }
492
493         CDEBUG(D_HA, "transno: "LPD64"\n",
494                lustre_msg_get_transno(req->rq_repmsg));
495 out_req:
496         ptlrpc_req_finished(req);
497 out:
498         if (rc && !*ea)
499                 obd_free_memmd(exp, &lsm);
500         RETURN(rc);
501 }
502
503 static int osc_punch_interpret(const struct lu_env *env,
504                                struct ptlrpc_request *req,
505                                struct osc_punch_args *aa, int rc)
506 {
507         struct ost_body *body;
508         ENTRY;
509
510         if (rc != 0)
511                 GOTO(out, rc);
512
513         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         if (body == NULL)
515                 GOTO(out, rc = -EPROTO);
516
517         *aa->pa_oa = body->oa;
518 out:
519         rc = aa->pa_upcall(aa->pa_cookie, rc);
520         RETURN(rc);
521 }
522
523 int osc_punch_base(struct obd_export *exp, struct obdo *oa,
524                    struct obd_capa *capa,
525                    obd_enqueue_update_f upcall, void *cookie,
526                    struct ptlrpc_request_set *rqset)
527 {
528         struct ptlrpc_request *req;
529         struct osc_punch_args *aa;
530         struct ost_body       *body;
531         int                    rc;
532         ENTRY;
533
534         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
535         if (req == NULL)
536                 RETURN(-ENOMEM);
537
538         osc_set_capa_size(req, &RMF_CAPA1, capa);
539         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
540         if (rc) {
541                 ptlrpc_request_free(req);
542                 RETURN(rc);
543         }
544         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
545         ptlrpc_at_set_req_timeout(req);
546
547         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
548         LASSERT(body);
549         body->oa = *oa;
550         osc_pack_capa(req, body, capa);
551
552         ptlrpc_request_set_replen(req);
553
554
555         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
556         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
557         aa = ptlrpc_req_async_args(req);
558         aa->pa_oa     = oa;
559         aa->pa_upcall = upcall;
560         aa->pa_cookie = cookie;
561         if (rqset == PTLRPCD_SET)
562                 ptlrpcd_add_req(req, PSCOPE_OTHER);
563         else
564                 ptlrpc_set_add_req(rqset, req);
565
566         RETURN(0);
567 }
568
569 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
570                      struct obd_trans_info *oti,
571                      struct ptlrpc_request_set *rqset)
572 {
573         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
574         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
575         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
576         return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
577                               oinfo->oi_cb_up, oinfo, rqset);
578 }
579
580 static int osc_sync(struct obd_export *exp, struct obdo *oa,
581                     struct lov_stripe_md *md, obd_size start, obd_size end,
582                     void *capa)
583 {
584         struct ptlrpc_request *req;
585         struct ost_body       *body;
586         int                    rc;
587         ENTRY;
588
589         if (!oa) {
590                 CDEBUG(D_INFO, "oa NULL\n");
591                 RETURN(-EINVAL);
592         }
593
594         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
595         if (req == NULL)
596                 RETURN(-ENOMEM);
597
598         osc_set_capa_size(req, &RMF_CAPA1, capa);
599         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
600         if (rc) {
601                 ptlrpc_request_free(req);
602                 RETURN(rc);
603         }
604
605         /* overload the size and blocks fields in the oa with start/end */
606         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
607         LASSERT(body);
608         body->oa = *oa;
609         body->oa.o_size = start;
610         body->oa.o_blocks = end;
611         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
612         osc_pack_capa(req, body, capa);
613
614         ptlrpc_request_set_replen(req);
615
616         rc = ptlrpc_queue_wait(req);
617         if (rc)
618                 GOTO(out, rc);
619
620         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
621         if (body == NULL)
622                 GOTO(out, rc = -EPROTO);
623
624         *oa = body->oa;
625
626         EXIT;
627  out:
628         ptlrpc_req_finished(req);
629         return rc;
630 }
631
632 /* Find and cancel locally locks matched by @mode in the resource found by
633  * @objid. Found locks are added into @cancel list. Returns the amount of
634  * locks added to @cancels list. */
635 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
636                                    struct list_head *cancels, ldlm_mode_t mode,
637                                    int lock_flags)
638 {
639         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
640         struct ldlm_res_id res_id;
641         struct ldlm_resource *res;
642         int count;
643         ENTRY;
644
645         osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
646         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
647         if (res == NULL)
648                 RETURN(0);
649
650         LDLM_RESOURCE_ADDREF(res);
651         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
652                                            lock_flags, 0, NULL);
653         LDLM_RESOURCE_DELREF(res);
654         ldlm_resource_putref(res);
655         RETURN(count);
656 }
657
658 static int osc_destroy_interpret(const struct lu_env *env,
659                                  struct ptlrpc_request *req, void *data,
660                                  int rc)
661 {
662         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
663
664         atomic_dec(&cli->cl_destroy_in_flight);
665         cfs_waitq_signal(&cli->cl_destroy_waitq);
666         return 0;
667 }
668
669 static int osc_can_send_destroy(struct client_obd *cli)
670 {
671         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
672             cli->cl_max_rpcs_in_flight) {
673                 /* The destroy request can be sent */
674                 return 1;
675         }
676         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
677             cli->cl_max_rpcs_in_flight) {
678                 /*
679                  * The counter has been modified between the two atomic
680                  * operations.
681                  */
682                 cfs_waitq_signal(&cli->cl_destroy_waitq);
683         }
684         return 0;
685 }
686
687 /* Destroy requests can be async always on the client, and we don't even really
688  * care about the return code since the client cannot do anything at all about
689  * a destroy failure.
690  * When the MDS is unlinking a filename, it saves the file objects into a
691  * recovery llog, and these object records are cancelled when the OST reports
692  * they were destroyed and sync'd to disk (i.e. transaction committed).
693  * If the client dies, or the OST is down when the object should be destroyed,
694  * the records are not cancelled, and when the OST reconnects to the MDS next,
695  * it will retrieve the llog unlink logs and then sends the log cancellation
696  * cookies to the MDS after committing destroy transactions. */
697 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
698                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
699                        struct obd_export *md_export, void *capa)
700 {
701         struct client_obd     *cli = &exp->exp_obd->u.cli;
702         struct ptlrpc_request *req;
703         struct ost_body       *body;
704         CFS_LIST_HEAD(cancels);
705         int rc, count;
706         ENTRY;
707
708         if (!oa) {
709                 CDEBUG(D_INFO, "oa NULL\n");
710                 RETURN(-EINVAL);
711         }
712
713         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
714                                         LDLM_FL_DISCARD_DATA);
715
716         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
717         if (req == NULL) {
718                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
719                 RETURN(-ENOMEM);
720         }
721
722         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
723         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
724                                0, &cancels, count);
725         if (rc) {
726                 ptlrpc_request_free(req);
727                 RETURN(rc);
728         }
729
730         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
731         ptlrpc_at_set_req_timeout(req);
732
733         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
734                 oa->o_lcookie = *oti->oti_logcookies;
735         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
736         LASSERT(body);
737         body->oa = *oa;
738
739         osc_pack_capa(req, body, (struct obd_capa *)capa);
740         ptlrpc_request_set_replen(req);
741
742         /* don't throttle destroy RPCs for the MDT */
743         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
744                 req->rq_interpret_reply = osc_destroy_interpret;
745                 if (!osc_can_send_destroy(cli)) {
746                         struct l_wait_info lwi = { 0 };
747
748                         /*
749                          * Wait until the number of on-going destroy RPCs drops
750                          * under max_rpc_in_flight
751                          */
752                         l_wait_event_exclusive(cli->cl_destroy_waitq,
753                                                osc_can_send_destroy(cli), &lwi);
754                 }
755         }
756
757         /* Do not wait for response */
758         ptlrpcd_add_req(req, PSCOPE_OTHER);
759         RETURN(0);
760 }
761
762 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
763                                 long writing_bytes)
764 {
765         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
766
767         LASSERT(!(oa->o_valid & bits));
768
769         oa->o_valid |= bits;
770         client_obd_list_lock(&cli->cl_loi_list_lock);
771         oa->o_dirty = cli->cl_dirty;
772         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
773                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
774                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
775                 oa->o_undirty = 0;
776         } else if (atomic_read(&obd_dirty_pages) -
777                    atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
778                 CERROR("dirty %d - %d > system dirty_max %d\n",
779                        atomic_read(&obd_dirty_pages),
780                        atomic_read(&obd_dirty_transit_pages),
781                        obd_max_dirty_pages);
782                 oa->o_undirty = 0;
783         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
784                 CERROR("dirty %lu - dirty_max %lu too big???\n",
785                        cli->cl_dirty, cli->cl_dirty_max);
786                 oa->o_undirty = 0;
787         } else {
788                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
789                                 (cli->cl_max_rpcs_in_flight + 1);
790                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
791         }
792         oa->o_grant = cli->cl_avail_grant;
793         oa->o_dropped = cli->cl_lost_grant;
794         cli->cl_lost_grant = 0;
795         client_obd_list_unlock(&cli->cl_loi_list_lock);
796         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
797                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
798
799 }
800
801 static void osc_update_next_shrink(struct client_obd *cli)
802 {
803         int time = GRANT_SHRINK_INTERVAL;
804         cli->cl_next_shrink_grant = cfs_time_shift(time);
805         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
806                cli->cl_next_shrink_grant);
807 }
808
809 /* caller must hold loi_list_lock */
810 static void osc_consume_write_grant(struct client_obd *cli,
811                                     struct brw_page *pga)
812 {
813         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
814         atomic_inc(&obd_dirty_pages);
815         cli->cl_dirty += CFS_PAGE_SIZE;
816         cli->cl_avail_grant -= CFS_PAGE_SIZE;
817         pga->flag |= OBD_BRW_FROM_GRANT;
818         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
819                CFS_PAGE_SIZE, pga, pga->pg);
820         LASSERT(cli->cl_avail_grant >= 0);
821         osc_update_next_shrink(cli);
822 }
823
824 /* the companion to osc_consume_write_grant, called when a brw has completed.
825  * must be called with the loi lock held. */
826 static void osc_release_write_grant(struct client_obd *cli,
827                                     struct brw_page *pga, int sent)
828 {
829         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
830         ENTRY;
831
832         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
833                 EXIT;
834                 return;
835         }
836
837         pga->flag &= ~OBD_BRW_FROM_GRANT;
838         atomic_dec(&obd_dirty_pages);
839         cli->cl_dirty -= CFS_PAGE_SIZE;
840         if (pga->flag & OBD_BRW_NOCACHE) {
841                 pga->flag &= ~OBD_BRW_NOCACHE;
842                 atomic_dec(&obd_dirty_transit_pages);
843                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
844         }
845         if (!sent) {
846                 cli->cl_lost_grant += CFS_PAGE_SIZE;
847                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
848                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
849         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
850                 /* For short writes we shouldn't count parts of pages that
851                  * span a whole block on the OST side, or our accounting goes
852                  * wrong.  Should match the code in filter_grant_check. */
853                 int offset = pga->off & ~CFS_PAGE_MASK;
854                 int count = pga->count + (offset & (blocksize - 1));
855                 int end = (offset + pga->count) & (blocksize - 1);
856                 if (end)
857                         count += blocksize - end;
858
859                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
860                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
861                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
862                        cli->cl_avail_grant, cli->cl_dirty);
863         }
864
865         EXIT;
866 }
867
868 static unsigned long rpcs_in_flight(struct client_obd *cli)
869 {
870         return cli->cl_r_in_flight + cli->cl_w_in_flight;
871 }
872
873 /* caller must hold loi_list_lock */
874 void osc_wake_cache_waiters(struct client_obd *cli)
875 {
876         struct list_head *l, *tmp;
877         struct osc_cache_waiter *ocw;
878
879         ENTRY;
880         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
881                 /* if we can't dirty more, we must wait until some is written */
882                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
883                    (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
884                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
885                                "osc max %ld, sys max %d\n", cli->cl_dirty,
886                                cli->cl_dirty_max, obd_max_dirty_pages);
887                         return;
888                 }
889
890                 /* if still dirty cache but no grant wait for pending RPCs that
891                  * may yet return us some grant before doing sync writes */
892                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
893                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
894                                cli->cl_w_in_flight);
895                         return;
896                 }
897
898                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
899                 list_del_init(&ocw->ocw_entry);
900                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
901                         /* no more RPCs in flight to return grant, do sync IO */
902                         ocw->ocw_rc = -EDQUOT;
903                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
904                 } else {
905                         osc_consume_write_grant(cli,
906                                                 &ocw->ocw_oap->oap_brw_page);
907                 }
908
909                 cfs_waitq_signal(&ocw->ocw_waitq);
910         }
911
912         EXIT;
913 }
914
915 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
919         if (body->oa.o_valid & OBD_MD_FLGRANT)
920                 cli->cl_avail_grant += body->oa.o_grant;
921         /* waiters are woken in brw_interpret */
922         client_obd_list_unlock(&cli->cl_loi_list_lock);
923 }
924
925 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
926                               void *key, obd_count vallen, void *val,
927                               struct ptlrpc_request_set *set);
928
929 static int osc_shrink_grant_interpret(const struct lu_env *env,
930                                       struct ptlrpc_request *req,
931                                       void *aa, int rc)
932 {
933         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
934         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
935         struct ost_body *body;
936         
937         if (rc != 0) {
938                 client_obd_list_lock(&cli->cl_loi_list_lock);
939                 cli->cl_avail_grant += oa->o_grant;
940                 client_obd_list_unlock(&cli->cl_loi_list_lock);
941                 GOTO(out, rc);
942         }
943
944         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
945         LASSERT(body);
946         osc_update_grant(cli, body);
947 out:
948         OBD_FREE_PTR(oa);
949         return rc;        
950 }
951
952 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
953 {
954         client_obd_list_lock(&cli->cl_loi_list_lock);
955         oa->o_grant = cli->cl_avail_grant / 4;
956         cli->cl_avail_grant -= oa->o_grant; 
957         client_obd_list_unlock(&cli->cl_loi_list_lock);
958         oa->o_flags |= OBD_FL_SHRINK_GRANT;
959         osc_update_next_shrink(cli);
960 }
961
962 static int osc_shrink_grant(struct client_obd *cli)
963 {
964         int    rc = 0;
965         struct ost_body     *body;
966         ENTRY;
967
968         OBD_ALLOC_PTR(body);
969         if (!body)
970                 RETURN(-ENOMEM);
971
972         osc_announce_cached(cli, &body->oa, 0);
973         osc_shrink_grant_local(cli, &body->oa);
974         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
975                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
976                                 sizeof(*body), body, NULL);
977         if (rc) {
978                 client_obd_list_lock(&cli->cl_loi_list_lock);
979                 cli->cl_avail_grant += body->oa.o_grant;
980                 client_obd_list_unlock(&cli->cl_loi_list_lock);
981         }
982         if (body)
983                OBD_FREE_PTR(body);
984         RETURN(rc);
985 }
986
987 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
988 static int osc_should_shrink_grant(struct client_obd *client)
989 {
990         cfs_time_t time = cfs_time_current();
991         cfs_time_t next_shrink = client->cl_next_shrink_grant;
992         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
993                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
994                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
995                         return 1;
996                 else
997                         osc_update_next_shrink(client);
998         }
999         return 0;
1000 }
1001
1002 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1003 {
1004         struct client_obd *client;
1005
1006         list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
1007                 if (osc_should_shrink_grant(client))
1008                         osc_shrink_grant(client);
1009         }
1010         return 0;
1011 }
1012
1013 static int osc_add_shrink_grant(struct client_obd *client)
1014 {
1015         int rc;
1016
1017         rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL, 
1018                                          TIMEOUT_GRANT,
1019                                          osc_grant_shrink_grant_cb, NULL,
1020                                          &client->cl_grant_shrink_list);
1021         if (rc) {
1022                 CERROR("add grant client %s error %d\n", 
1023                         client->cl_import->imp_obd->obd_name, rc);
1024                 return rc;
1025         }
1026         CDEBUG(D_CACHE, "add grant client %s \n", 
1027                client->cl_import->imp_obd->obd_name);
1028         osc_update_next_shrink(client);
1029         return 0; 
1030 }
1031
1032 static int osc_del_shrink_grant(struct client_obd *client)
1033 {
1034         CDEBUG(D_CACHE, "del grant client %s \n", 
1035                client->cl_import->imp_obd->obd_name);
1036         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list);
1037 }
1038
1039 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1040 {
1041         client_obd_list_lock(&cli->cl_loi_list_lock);
1042         cli->cl_avail_grant = ocd->ocd_grant;
1043         client_obd_list_unlock(&cli->cl_loi_list_lock);
1044
1045         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1046             list_empty(&cli->cl_grant_shrink_list))
1047                 osc_add_shrink_grant(cli);
1048
1049         CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1050                cli->cl_avail_grant, cli->cl_lost_grant);
1051         LASSERT(cli->cl_avail_grant >= 0);
1052 }
1053
1054 /* We assume that the reason this OSC got a short read is because it read
1055  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1056  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1057  * this stripe never got written at or beyond this stripe offset yet. */
1058 static void handle_short_read(int nob_read, obd_count page_count,
1059                               struct brw_page **pga)
1060 {
1061         char *ptr;
1062         int i = 0;
1063
1064         /* skip bytes read OK */
1065         while (nob_read > 0) {
1066                 LASSERT (page_count > 0);
1067
1068                 if (pga[i]->count > nob_read) {
1069                         /* EOF inside this page */
1070                         ptr = cfs_kmap(pga[i]->pg) +
1071                                 (pga[i]->off & ~CFS_PAGE_MASK);
1072                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1073                         cfs_kunmap(pga[i]->pg);
1074                         page_count--;
1075                         i++;
1076                         break;
1077                 }
1078
1079                 nob_read -= pga[i]->count;
1080                 page_count--;
1081                 i++;
1082         }
1083
1084         /* zero remaining pages */
1085         while (page_count-- > 0) {
1086                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1087                 memset(ptr, 0, pga[i]->count);
1088                 cfs_kunmap(pga[i]->pg);
1089                 i++;
1090         }
1091 }
1092
1093 static int check_write_rcs(struct ptlrpc_request *req,
1094                            int requested_nob, int niocount,
1095                            obd_count page_count, struct brw_page **pga)
1096 {
1097         int    *remote_rcs, i;
1098
1099         /* return error if any niobuf was in error */
1100         remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
1101                                         sizeof(*remote_rcs) * niocount, NULL);
1102         if (remote_rcs == NULL) {
1103                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1104                 return(-EPROTO);
1105         }
1106         if (lustre_msg_swabbed(req->rq_repmsg))
1107                 for (i = 0; i < niocount; i++)
1108                         __swab32s(&remote_rcs[i]);
1109
1110         for (i = 0; i < niocount; i++) {
1111                 if (remote_rcs[i] < 0)
1112                         return(remote_rcs[i]);
1113
1114                 if (remote_rcs[i] != 0) {
1115                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1116                                 i, remote_rcs[i], req);
1117                         return(-EPROTO);
1118                 }
1119         }
1120
1121         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1122                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1123                        req->rq_bulk->bd_nob_transferred, requested_nob);
1124                 return(-EPROTO);
1125         }
1126
1127         return (0);
1128 }
1129
1130 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1131 {
1132         if (p1->flag != p2->flag) {
1133                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1134                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC);
1135
1136                 /* warn if we try to combine flags that we don't know to be
1137                  * safe to combine */
1138                 if ((p1->flag & mask) != (p2->flag & mask))
1139                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1140                                "same brw?\n", p1->flag, p2->flag);
1141                 return 0;
1142         }
1143
1144         return (p1->off + p1->count == p2->off);
1145 }
1146
1147 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1148                                    struct brw_page **pga, int opc,
1149                                    cksum_type_t cksum_type)
1150 {
1151         __u32 cksum;
1152         int i = 0;
1153
1154         LASSERT (pg_count > 0);
1155         cksum = init_checksum(cksum_type);
1156         while (nob > 0 && pg_count > 0) {
1157                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1158                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1159                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1160
1161                 /* corrupt the data before we compute the checksum, to
1162                  * simulate an OST->client data error */
1163                 if (i == 0 && opc == OST_READ &&
1164                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1165                         memcpy(ptr + off, "bad1", min(4, nob));
1166                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1167                 cfs_kunmap(pga[i]->pg);
1168                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1169                                off, cksum);
1170
1171                 nob -= pga[i]->count;
1172                 pg_count--;
1173                 i++;
1174         }
1175         /* For sending we only compute the wrong checksum instead
1176          * of corrupting the data so it is still correct on a redo */
1177         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1178                 cksum++;
1179
1180         return cksum;
1181 }
1182
1183 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1184                                 struct lov_stripe_md *lsm, obd_count page_count,
1185                                 struct brw_page **pga,
1186                                 struct ptlrpc_request **reqp,
1187                                 struct obd_capa *ocapa, int reserve)
1188 {
1189         struct ptlrpc_request   *req;
1190         struct ptlrpc_bulk_desc *desc;
1191         struct ost_body         *body;
1192         struct obd_ioobj        *ioobj;
1193         struct niobuf_remote    *niobuf;
1194         int niocount, i, requested_nob, opc, rc;
1195         struct osc_brw_async_args *aa;
1196         struct req_capsule      *pill;
1197         struct brw_page *pg_prev;
1198
1199         ENTRY;
1200         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1201                 RETURN(-ENOMEM); /* Recoverable */
1202         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1203                 RETURN(-EINVAL); /* Fatal */
1204
1205         if ((cmd & OBD_BRW_WRITE) != 0) {
1206                 opc = OST_WRITE;
1207                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1208                                                 cli->cl_import->imp_rq_pool,
1209                                                 &RQF_OST_BRW);
1210         } else {
1211                 opc = OST_READ;
1212                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
1213         }
1214         if (req == NULL)
1215                 RETURN(-ENOMEM);
1216
1217         for (niocount = i = 1; i < page_count; i++) {
1218                 if (!can_merge_pages(pga[i - 1], pga[i]))
1219                         niocount++;
1220         }
1221
1222         pill = &req->rq_pill;
1223         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1224                              niocount * sizeof(*niobuf));
1225         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1226
1227         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1228         if (rc) {
1229                 ptlrpc_request_free(req);
1230                 RETURN(rc);
1231         }
1232         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1233         ptlrpc_at_set_req_timeout(req);
1234
1235         if (opc == OST_WRITE)
1236                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1237                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1238         else
1239                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1240                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1241
1242         if (desc == NULL)
1243                 GOTO(out, rc = -ENOMEM);
1244         /* NB request now owns desc and will free it when it gets freed */
1245
1246         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1247         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1248         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1249         LASSERT(body && ioobj && niobuf);
1250
1251         body->oa = *oa;
1252
1253         obdo_to_ioobj(oa, ioobj);
1254         ioobj->ioo_bufcnt = niocount;
1255         osc_pack_capa(req, body, ocapa);
1256         LASSERT (page_count > 0);
1257         pg_prev = pga[0];
1258         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1259                 struct brw_page *pg = pga[i];
1260
1261                 LASSERT(pg->count > 0);
1262                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1263                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1264                          pg->off, pg->count);
1265 #ifdef __linux__
1266                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1267                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1268                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1269                          i, page_count,
1270                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1271                          pg_prev->pg, page_private(pg_prev->pg),
1272                          pg_prev->pg->index, pg_prev->off);
1273 #else
1274                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1275                          "i %d p_c %u\n", i, page_count);
1276 #endif
1277                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1278                         (pg->flag & OBD_BRW_SRVLOCK));
1279
1280                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1281                                       pg->count);
1282                 requested_nob += pg->count;
1283
1284                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1285                         niobuf--;
1286                         niobuf->len += pg->count;
1287                 } else {
1288                         niobuf->offset = pg->off;
1289                         niobuf->len    = pg->count;
1290                         niobuf->flags  = pg->flag;
1291                 }
1292                 pg_prev = pg;
1293         }
1294
1295         LASSERTF((void *)(niobuf - niocount) ==
1296                 lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
1297                                niocount * sizeof(*niobuf)),
1298                 "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
1299                 REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
1300                 (void *)(niobuf - niocount));
1301
1302         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1303         if (osc_should_shrink_grant(cli))
1304                 osc_shrink_grant_local(cli, &body->oa); 
1305
1306         /* size[REQ_REC_OFF] still sizeof (*body) */
1307         if (opc == OST_WRITE) {
1308                 if (unlikely(cli->cl_checksum) &&
1309                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1310                         /* store cl_cksum_type in a local variable since
1311                          * it can be changed via lprocfs */
1312                         cksum_type_t cksum_type = cli->cl_cksum_type;
1313
1314                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1315                                 oa->o_flags = body->oa.o_flags = 0;
1316                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1317                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1318                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1319                                                              page_count, pga,
1320                                                              OST_WRITE,
1321                                                              cksum_type);
1322                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1323                                body->oa.o_cksum);
1324                         /* save this in 'oa', too, for later checking */
1325                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1326                         oa->o_flags |= cksum_type_pack(cksum_type);
1327                 } else {
1328                         /* clear out the checksum flag, in case this is a
1329                          * resend but cl_checksum is no longer set. b=11238 */
1330                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1331                 }
1332                 oa->o_cksum = body->oa.o_cksum;
1333                 /* 1 RC per niobuf */
1334                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
1335                                      sizeof(__u32) * niocount);
1336         } else {
1337                 if (unlikely(cli->cl_checksum) &&
1338                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1339                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1340                                 body->oa.o_flags = 0;
1341                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1342                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1343                 }
1344                 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
1345                 /* 1 RC for the whole I/O */
1346         }
1347         ptlrpc_request_set_replen(req);
1348
1349         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1350         aa = ptlrpc_req_async_args(req);
1351         aa->aa_oa = oa;
1352         aa->aa_requested_nob = requested_nob;
1353         aa->aa_nio_count = niocount;
1354         aa->aa_page_count = page_count;
1355         aa->aa_resends = 0;
1356         aa->aa_ppga = pga;
1357         aa->aa_cli = cli;
1358         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1359         if (ocapa && reserve)
1360                 aa->aa_ocapa = capa_get(ocapa);
1361
1362         *reqp = req;
1363         RETURN(0);
1364
1365  out:
1366         ptlrpc_req_finished(req);
1367         RETURN(rc);
1368 }
1369
1370 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1371                                 __u32 client_cksum, __u32 server_cksum, int nob,
1372                                 obd_count page_count, struct brw_page **pga,
1373                                 cksum_type_t client_cksum_type)
1374 {
1375         __u32 new_cksum;
1376         char *msg;
1377         cksum_type_t cksum_type;
1378
1379         if (server_cksum == client_cksum) {
1380                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1381                 return 0;
1382         }
1383
1384         if (oa->o_valid & OBD_MD_FLFLAGS)
1385                 cksum_type = cksum_type_unpack(oa->o_flags);
1386         else
1387                 cksum_type = OBD_CKSUM_CRC32;
1388
1389         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1390                                       cksum_type);
1391
1392         if (cksum_type != client_cksum_type)
1393                 msg = "the server did not use the checksum type specified in "
1394                       "the original request - likely a protocol problem";
1395         else if (new_cksum == server_cksum)
1396                 msg = "changed on the client after we checksummed it - "
1397                       "likely false positive due to mmap IO (bug 11742)";
1398         else if (new_cksum == client_cksum)
1399                 msg = "changed in transit before arrival at OST";
1400         else
1401                 msg = "changed in transit AND doesn't match the original - "
1402                       "likely false positive due to mmap IO (bug 11742)";
1403
1404         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
1405                            LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
1406                            "["LPU64"-"LPU64"]\n",
1407                            msg, libcfs_nid2str(peer->nid),
1408                            oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
1409                            oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
1410                                                         (__u64)0,
1411                            oa->o_id,
1412                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
1413                            pga[0]->off,
1414                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1415         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1416                "client csum now %x\n", client_cksum, client_cksum_type,
1417                server_cksum, cksum_type, new_cksum);
1418         return 1;
1419 }
1420
1421 /* Note rc enters this function as number of bytes transferred */
1422 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1423 {
1424         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1425         const lnet_process_id_t *peer =
1426                         &req->rq_import->imp_connection->c_peer;
1427         struct client_obd *cli = aa->aa_cli;
1428         struct ost_body *body;
1429         __u32 client_cksum = 0;
1430         ENTRY;
1431
1432         if (rc < 0 && rc != -EDQUOT)
1433                 RETURN(rc);
1434
1435         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1436         body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
1437                                   lustre_swab_ost_body);
1438         if (body == NULL) {
1439                 CDEBUG(D_INFO, "Can't unpack body\n");
1440                 RETURN(-EPROTO);
1441         }
1442
1443         /* set/clear over quota flag for a uid/gid */
1444         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1445             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
1446                 lquota_setdq(quota_interface, cli, body->oa.o_uid,
1447                              body->oa.o_gid, body->oa.o_valid,
1448                              body->oa.o_flags);
1449
1450         if (rc < 0)
1451                 RETURN(rc);
1452
1453         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1454                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1455
1456         osc_update_grant(cli, body);
1457
1458         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1459                 if (rc > 0) {
1460                         CERROR("Unexpected +ve rc %d\n", rc);
1461                         RETURN(-EPROTO);
1462                 }
1463                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1464
1465                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1466                         RETURN(-EAGAIN);
1467
1468                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1469                     check_write_checksum(&body->oa, peer, client_cksum,
1470                                          body->oa.o_cksum, aa->aa_requested_nob,
1471                                          aa->aa_page_count, aa->aa_ppga,
1472                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1473                         RETURN(-EAGAIN);
1474
1475                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1476                                      aa->aa_page_count, aa->aa_ppga);
1477                 GOTO(out, rc);
1478         }
1479
1480         /* The rest of this function executes only for OST_READs */
1481
1482         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1483         if (rc < 0)
1484                 GOTO(out, rc);
1485
1486         if (rc > aa->aa_requested_nob) {
1487                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1488                        aa->aa_requested_nob);
1489                 RETURN(-EPROTO);
1490         }
1491
1492         if (rc != req->rq_bulk->bd_nob_transferred) {
1493                 CERROR ("Unexpected rc %d (%d transferred)\n",
1494                         rc, req->rq_bulk->bd_nob_transferred);
1495                 return (-EPROTO);
1496         }
1497
1498         if (rc < aa->aa_requested_nob)
1499                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1500
1501         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1502                 static int cksum_counter;
1503                 __u32      server_cksum = body->oa.o_cksum;
1504                 char      *via;
1505                 char      *router;
1506                 cksum_type_t cksum_type;
1507
1508                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1509                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1510                 else
1511                         cksum_type = OBD_CKSUM_CRC32;
1512                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1513                                                  aa->aa_ppga, OST_READ,
1514                                                  cksum_type);
1515
1516                 if (peer->nid == req->rq_bulk->bd_sender) {
1517                         via = router = "";
1518                 } else {
1519                         via = " via ";
1520                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1521                 }
1522
1523                 if (server_cksum == ~0 && rc > 0) {
1524                         CERROR("Protocol error: server %s set the 'checksum' "
1525                                "bit, but didn't send a checksum.  Not fatal, "
1526                                "but please notify on http://bugzilla.lustre.org/\n",
1527                                libcfs_nid2str(peer->nid));
1528                 } else if (server_cksum != client_cksum) {
1529                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1530                                            "%s%s%s inum "LPU64"/"LPU64" object "
1531                                            LPU64"/"LPU64" extent "
1532                                            "["LPU64"-"LPU64"]\n",
1533                                            req->rq_import->imp_obd->obd_name,
1534                                            libcfs_nid2str(peer->nid),
1535                                            via, router,
1536                                            body->oa.o_valid & OBD_MD_FLFID ?
1537                                                 body->oa.o_fid : (__u64)0,
1538                                            body->oa.o_valid & OBD_MD_FLFID ?
1539                                                 body->oa.o_generation :(__u64)0,
1540                                            body->oa.o_id,
1541                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1542                                                 body->oa.o_gr : (__u64)0,
1543                                            aa->aa_ppga[0]->off,
1544                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1545                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1546                                                                         1);
1547                         CERROR("client %x, server %x, cksum_type %x\n",
1548                                client_cksum, server_cksum, cksum_type);
1549                         cksum_counter = 0;
1550                         aa->aa_oa->o_cksum = client_cksum;
1551                         rc = -EAGAIN;
1552                 } else {
1553                         cksum_counter++;
1554                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1555                         rc = 0;
1556                 }
1557         } else if (unlikely(client_cksum)) {
1558                 static int cksum_missed;
1559
1560                 cksum_missed++;
1561                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1562                         CERROR("Checksum %u requested from %s but not sent\n",
1563                                cksum_missed, libcfs_nid2str(peer->nid));
1564         } else {
1565                 rc = 0;
1566         }
1567 out:
1568         if (rc >= 0)
1569                 *aa->aa_oa = body->oa;
1570
1571         RETURN(rc);
1572 }
1573
1574 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1575                             struct lov_stripe_md *lsm,
1576                             obd_count page_count, struct brw_page **pga,
1577                             struct obd_capa *ocapa)
1578 {
1579         struct ptlrpc_request *req;
1580         int                    rc;
1581         cfs_waitq_t            waitq;
1582         int                    resends = 0;
1583         struct l_wait_info     lwi;
1584
1585         ENTRY;
1586
1587         cfs_waitq_init(&waitq);
1588
1589 restart_bulk:
1590         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1591                                   page_count, pga, &req, ocapa, 0);
1592         if (rc != 0)
1593                 return (rc);
1594
1595         rc = ptlrpc_queue_wait(req);
1596
1597         if (rc == -ETIMEDOUT && req->rq_resend) {
1598                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1599                 ptlrpc_req_finished(req);
1600                 goto restart_bulk;
1601         }
1602
1603         rc = osc_brw_fini_request(req, rc);
1604
1605         ptlrpc_req_finished(req);
1606         if (osc_recoverable_error(rc)) {
1607                 resends++;
1608                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1609                         CERROR("too many resend retries, returning error\n");
1610                         RETURN(-EIO);
1611                 }
1612
1613                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1614                 l_wait_event(waitq, 0, &lwi);
1615
1616                 goto restart_bulk;
1617         }
1618
1619         RETURN (rc);
1620 }
1621
1622 int osc_brw_redo_request(struct ptlrpc_request *request,
1623                          struct osc_brw_async_args *aa)
1624 {
1625         struct ptlrpc_request *new_req;
1626         struct ptlrpc_request_set *set = request->rq_set;
1627         struct osc_brw_async_args *new_aa;
1628         struct osc_async_page *oap;
1629         int rc = 0;
1630         ENTRY;
1631
1632         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1633                 CERROR("too many resend retries, returning error\n");
1634                 RETURN(-EIO);
1635         }
1636
1637         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1638
1639         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1640                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1641                                   aa->aa_cli, aa->aa_oa,
1642                                   NULL /* lsm unused by osc currently */,
1643                                   aa->aa_page_count, aa->aa_ppga,
1644                                   &new_req, aa->aa_ocapa, 0);
1645         if (rc)
1646                 RETURN(rc);
1647
1648         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1649
1650         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1651                 if (oap->oap_request != NULL) {
1652                         LASSERTF(request == oap->oap_request,
1653                                  "request %p != oap_request %p\n",
1654                                  request, oap->oap_request);
1655                         if (oap->oap_interrupted) {
1656                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1657                                 ptlrpc_req_finished(new_req);
1658                                 RETURN(-EINTR);
1659                         }
1660                 }
1661         }
1662         /* New request takes over pga and oaps from old request.
1663          * Note that copying a list_head doesn't work, need to move it... */
1664         aa->aa_resends++;
1665         new_req->rq_interpret_reply = request->rq_interpret_reply;
1666         new_req->rq_async_args = request->rq_async_args;
1667         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1668
1669         new_aa = ptlrpc_req_async_args(new_req);
1670
1671         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1672         list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1673         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1674
1675         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1676                 if (oap->oap_request) {
1677                         ptlrpc_req_finished(oap->oap_request);
1678                         oap->oap_request = ptlrpc_request_addref(new_req);
1679                 }
1680         }
1681
1682         new_aa->aa_ocapa = aa->aa_ocapa;
1683         aa->aa_ocapa = NULL;
1684
1685         /* use ptlrpc_set_add_req is safe because interpret functions work
1686          * in check_set context. only one way exist with access to request
1687          * from different thread got -EINTR - this way protected with
1688          * cl_loi_list_lock */
1689         ptlrpc_set_add_req(set, new_req);
1690
1691         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1692
1693         DEBUG_REQ(D_INFO, new_req, "new request");
1694         RETURN(0);
1695 }
1696
1697 /*
1698  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1699  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1700  * fine for our small page arrays and doesn't require allocation.  its an
1701  * insertion sort that swaps elements that are strides apart, shrinking the
1702  * stride down until its '1' and the array is sorted.
1703  */
1704 static void sort_brw_pages(struct brw_page **array, int num)
1705 {
1706         int stride, i, j;
1707         struct brw_page *tmp;
1708
1709         if (num == 1)
1710                 return;
1711         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1712                 ;
1713
1714         do {
1715                 stride /= 3;
1716                 for (i = stride ; i < num ; i++) {
1717                         tmp = array[i];
1718                         j = i;
1719                         while (j >= stride && array[j - stride]->off > tmp->off) {
1720                                 array[j] = array[j - stride];
1721                                 j -= stride;
1722                         }
1723                         array[j] = tmp;
1724                 }
1725         } while (stride > 1);
1726 }
1727
1728 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1729 {
1730         int count = 1;
1731         int offset;
1732         int i = 0;
1733
1734         LASSERT (pages > 0);
1735         offset = pg[i]->off & ~CFS_PAGE_MASK;
1736
1737         for (;;) {
1738                 pages--;
1739                 if (pages == 0)         /* that's all */
1740                         return count;
1741
1742                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1743                         return count;   /* doesn't end on page boundary */
1744
1745                 i++;
1746                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1747                 if (offset != 0)        /* doesn't start on page boundary */
1748                         return count;
1749
1750                 count++;
1751         }
1752 }
1753
1754 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1755 {
1756         struct brw_page **ppga;
1757         int i;
1758
1759         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1760         if (ppga == NULL)
1761                 return NULL;
1762
1763         for (i = 0; i < count; i++)
1764                 ppga[i] = pga + i;
1765         return ppga;
1766 }
1767
1768 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1769 {
1770         LASSERT(ppga != NULL);
1771         OBD_FREE(ppga, sizeof(*ppga) * count);
1772 }
1773
1774 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1775                    obd_count page_count, struct brw_page *pga,
1776                    struct obd_trans_info *oti)
1777 {
1778         struct obdo *saved_oa = NULL;
1779         struct brw_page **ppga, **orig;
1780         struct obd_import *imp = class_exp2cliimp(exp);
1781         struct client_obd *cli = &imp->imp_obd->u.cli;
1782         int rc, page_count_orig;
1783         ENTRY;
1784
1785         if (cmd & OBD_BRW_CHECK) {
1786                 /* The caller just wants to know if there's a chance that this
1787                  * I/O can succeed */
1788
1789                 if (imp == NULL || imp->imp_invalid)
1790                         RETURN(-EIO);
1791                 RETURN(0);
1792         }
1793
1794         /* test_brw with a failed create can trip this, maybe others. */
1795         LASSERT(cli->cl_max_pages_per_rpc);
1796
1797         rc = 0;
1798
1799         orig = ppga = osc_build_ppga(pga, page_count);
1800         if (ppga == NULL)
1801                 RETURN(-ENOMEM);
1802         page_count_orig = page_count;
1803
1804         sort_brw_pages(ppga, page_count);
1805         while (page_count) {
1806                 obd_count pages_per_brw;
1807
1808                 if (page_count > cli->cl_max_pages_per_rpc)
1809                         pages_per_brw = cli->cl_max_pages_per_rpc;
1810                 else
1811                         pages_per_brw = page_count;
1812
1813                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1814
1815                 if (saved_oa != NULL) {
1816                         /* restore previously saved oa */
1817                         *oinfo->oi_oa = *saved_oa;
1818                 } else if (page_count > pages_per_brw) {
1819                         /* save a copy of oa (brw will clobber it) */
1820                         OBDO_ALLOC(saved_oa);
1821                         if (saved_oa == NULL)
1822                                 GOTO(out, rc = -ENOMEM);
1823                         *saved_oa = *oinfo->oi_oa;
1824                 }
1825
1826                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1827                                       pages_per_brw, ppga, oinfo->oi_capa);
1828
1829                 if (rc != 0)
1830                         break;
1831
1832                 page_count -= pages_per_brw;
1833                 ppga += pages_per_brw;
1834         }
1835
1836 out:
1837         osc_release_ppga(orig, page_count_orig);
1838
1839         if (saved_oa != NULL)
1840                 OBDO_FREE(saved_oa);
1841
1842         RETURN(rc);
1843 }
1844
1845 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1846  * the dirty accounting.  Writeback completes or truncate happens before
1847  * writing starts.  Must be called with the loi lock held. */
1848 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1849                            int sent)
1850 {
1851         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1852 }
1853
1854
1855 /* This maintains the lists of pending pages to read/write for a given object
1856  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1857  * to quickly find objects that are ready to send an RPC. */
1858 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1859                          int cmd)
1860 {
1861         int optimal;
1862         ENTRY;
1863
1864         if (lop->lop_num_pending == 0)
1865                 RETURN(0);
1866
1867         /* if we have an invalid import we want to drain the queued pages
1868          * by forcing them through rpcs that immediately fail and complete
1869          * the pages.  recovery relies on this to empty the queued pages
1870          * before canceling the locks and evicting down the llite pages */
1871         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1872                 RETURN(1);
1873
1874         /* stream rpcs in queue order as long as as there is an urgent page
1875          * queued.  this is our cheap solution for good batching in the case
1876          * where writepage marks some random page in the middle of the file
1877          * as urgent because of, say, memory pressure */
1878         if (!list_empty(&lop->lop_urgent)) {
1879                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1880                 RETURN(1);
1881         }
1882         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1883         optimal = cli->cl_max_pages_per_rpc;
1884         if (cmd & OBD_BRW_WRITE) {
1885                 /* trigger a write rpc stream as long as there are dirtiers
1886                  * waiting for space.  as they're waiting, they're not going to
1887                  * create more pages to coallesce with what's waiting.. */
1888                 if (!list_empty(&cli->cl_cache_waiters)) {
1889                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1890                         RETURN(1);
1891                 }
1892                 /* +16 to avoid triggering rpcs that would want to include pages
1893                  * that are being queued but which can't be made ready until
1894                  * the queuer finishes with the page. this is a wart for
1895                  * llite::commit_write() */
1896                 optimal += 16;
1897         }
1898         if (lop->lop_num_pending >= optimal)
1899                 RETURN(1);
1900
1901         RETURN(0);
1902 }
1903
1904 static void on_list(struct list_head *item, struct list_head *list,
1905                     int should_be_on)
1906 {
1907         if (list_empty(item) && should_be_on)
1908                 list_add_tail(item, list);
1909         else if (!list_empty(item) && !should_be_on)
1910                 list_del_init(item);
1911 }
1912
1913 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
1914  * can find pages to build into rpcs quickly */
1915 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
1916 {
1917         on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
1918                 lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
1919                 lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
1920
1921         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
1922                 loi->loi_write_lop.lop_num_pending);
1923
1924         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
1925                 loi->loi_read_lop.lop_num_pending);
1926 }
1927
1928 static void lop_update_pending(struct client_obd *cli,
1929                                struct loi_oap_pages *lop, int cmd, int delta)
1930 {
1931         lop->lop_num_pending += delta;
1932         if (cmd & OBD_BRW_WRITE)
1933                 cli->cl_pending_w_pages += delta;
1934         else
1935                 cli->cl_pending_r_pages += delta;
1936 }
1937
1938 /**
1939  * this is called when a sync waiter receives an interruption.  Its job is to
1940  * get the caller woken as soon as possible.  If its page hasn't been put in an
1941  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
1942  * desiring interruption which will forcefully complete the rpc once the rpc
1943  * has timed out.
1944  */
1945 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
1946 {
1947         struct loi_oap_pages *lop;
1948         struct lov_oinfo *loi;
1949         int rc = -EBUSY;
1950         ENTRY;
1951
1952         LASSERT(!oap->oap_interrupted);
1953         oap->oap_interrupted = 1;
1954
1955         /* ok, it's been put in an rpc. only one oap gets a request reference */
1956         if (oap->oap_request != NULL) {
1957                 ptlrpc_mark_interrupted(oap->oap_request);
1958                 ptlrpcd_wake(oap->oap_request);
1959                 ptlrpc_req_finished(oap->oap_request);
1960                 oap->oap_request = NULL;
1961         }
1962
1963         /*
1964          * page completion may be called only if ->cpo_prep() method was
1965          * executed by osc_io_submit(), that also adds page the to pending list
1966          */
1967         if (!list_empty(&oap->oap_pending_item)) {
1968                 list_del_init(&oap->oap_pending_item);
1969                 list_del_init(&oap->oap_urgent_item);
1970
1971                 loi = oap->oap_loi;
1972                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
1973                         &loi->loi_write_lop : &loi->loi_read_lop;
1974                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
1975                 loi_list_maint(oap->oap_cli, oap->oap_loi);
1976                 rc = oap->oap_caller_ops->ap_completion(env,
1977                                           oap->oap_caller_data,
1978                                           oap->oap_cmd, NULL, -EINTR);
1979         }
1980
1981         RETURN(rc);
1982 }
1983
1984 /* this is trying to propogate async writeback errors back up to the
1985  * application.  As an async write fails we record the error code for later if
1986  * the app does an fsync.  As long as errors persist we force future rpcs to be
1987  * sync so that the app can get a sync error and break the cycle of queueing
1988  * pages for which writeback will fail. */
1989 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1990                            int rc)
1991 {
1992         if (rc) {
1993                 if (!ar->ar_rc)
1994                         ar->ar_rc = rc;
1995
1996                 ar->ar_force_sync = 1;
1997                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1998                 return;
1999
2000         }
2001
2002         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2003                 ar->ar_force_sync = 0;
2004 }
2005
2006 void osc_oap_to_pending(struct osc_async_page *oap)
2007 {
2008         struct loi_oap_pages *lop;
2009
2010         if (oap->oap_cmd & OBD_BRW_WRITE)
2011                 lop = &oap->oap_loi->loi_write_lop;
2012         else
2013                 lop = &oap->oap_loi->loi_read_lop;
2014
2015         if (oap->oap_async_flags & ASYNC_URGENT)
2016                 list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2017         list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2018         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2019 }
2020
2021 /* this must be called holding the loi list lock to give coverage to exit_cache,
2022  * async_flag maintenance, and oap_request */
2023 static void osc_ap_completion(const struct lu_env *env,
2024                               struct client_obd *cli, struct obdo *oa,
2025                               struct osc_async_page *oap, int sent, int rc)
2026 {
2027         __u64 xid = 0;
2028
2029         ENTRY;
2030         if (oap->oap_request != NULL) {
2031                 xid = ptlrpc_req_xid(oap->oap_request);
2032                 ptlrpc_req_finished(oap->oap_request);
2033                 oap->oap_request = NULL;
2034         }
2035
2036         oap->oap_async_flags = 0;
2037         oap->oap_interrupted = 0;
2038
2039         if (oap->oap_cmd & OBD_BRW_WRITE) {
2040                 osc_process_ar(&cli->cl_ar, xid, rc);
2041                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2042         }
2043
2044         if (rc == 0 && oa != NULL) {
2045                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2046                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2047                 if (oa->o_valid & OBD_MD_FLMTIME)
2048                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2049                 if (oa->o_valid & OBD_MD_FLATIME)
2050                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2051                 if (oa->o_valid & OBD_MD_FLCTIME)
2052                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2053         }
2054
2055         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2056                                                 oap->oap_cmd, oa, rc);
2057
2058         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2059          * I/O on the page could start, but OSC calls it under lock
2060          * and thus we can add oap back to pending safely */
2061         if (rc)
2062                 /* upper layer wants to leave the page on pending queue */
2063                 osc_oap_to_pending(oap);
2064         else
2065                 osc_exit_cache(cli, oap, sent);
2066         EXIT;
2067 }
2068
2069 static int brw_interpret(const struct lu_env *env,
2070                          struct ptlrpc_request *req, void *data, int rc)
2071 {
2072         struct osc_brw_async_args *aa = data;
2073         struct client_obd *cli;
2074         int async;
2075         ENTRY;
2076
2077         rc = osc_brw_fini_request(req, rc);
2078         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2079         if (osc_recoverable_error(rc)) {
2080                 rc = osc_brw_redo_request(req, aa);
2081                 if (rc == 0)
2082                         RETURN(0);
2083         }
2084
2085         if (aa->aa_ocapa) {
2086                 capa_put(aa->aa_ocapa);
2087                 aa->aa_ocapa = NULL;
2088         }
2089
2090         cli = aa->aa_cli;
2091
2092         client_obd_list_lock(&cli->cl_loi_list_lock);
2093
2094         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2095          * is called so we know whether to go to sync BRWs or wait for more
2096          * RPCs to complete */
2097         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2098                 cli->cl_w_in_flight--;
2099         else
2100                 cli->cl_r_in_flight--;
2101
2102         async = list_empty(&aa->aa_oaps);
2103         if (!async) { /* from osc_send_oap_rpc() */
2104                 struct osc_async_page *oap, *tmp;
2105                 /* the caller may re-use the oap after the completion call so
2106                  * we need to clean it up a little */
2107                 list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
2108                         list_del_init(&oap->oap_rpc_item);
2109                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2110                 }
2111                 OBDO_FREE(aa->aa_oa);
2112         } else { /* from async_internal() */
2113                 int i;
2114                 for (i = 0; i < aa->aa_page_count; i++)
2115                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2116         }
2117         osc_wake_cache_waiters(cli);
2118         osc_check_rpcs(env, cli);
2119         client_obd_list_unlock(&cli->cl_loi_list_lock);
2120         if (!async)
2121                 cl_req_completion(env, aa->aa_clerq, rc);
2122         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2123         RETURN(rc);
2124 }
2125
2126 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2127                                             struct client_obd *cli,
2128                                             struct list_head *rpc_list,
2129                                             int page_count, int cmd)
2130 {
2131         struct ptlrpc_request *req;
2132         struct brw_page **pga = NULL;
2133         struct osc_brw_async_args *aa;
2134         struct obdo *oa = NULL;
2135         const struct obd_async_page_ops *ops = NULL;
2136         void *caller_data = NULL;
2137         struct osc_async_page *oap;
2138         struct osc_async_page *tmp;
2139         struct ost_body *body;
2140         struct cl_req *clerq = NULL;
2141         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2142         struct ldlm_lock *lock = NULL;
2143         struct cl_req_attr crattr;
2144         int i, rc;
2145
2146         ENTRY;
2147         LASSERT(!list_empty(rpc_list));
2148
2149         memset(&crattr, 0, sizeof crattr);
2150         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2151         if (pga == NULL)
2152                 GOTO(out, req = ERR_PTR(-ENOMEM));
2153
2154         OBDO_ALLOC(oa);
2155         if (oa == NULL)
2156                 GOTO(out, req = ERR_PTR(-ENOMEM));
2157
2158         i = 0;
2159         list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2160                 struct cl_page *page = osc_oap2cl_page(oap);
2161                 if (ops == NULL) {
2162                         ops = oap->oap_caller_ops;
2163                         caller_data = oap->oap_caller_data;
2164
2165                         clerq = cl_req_alloc(env, page, crt,
2166                                              1 /* only 1-object rpcs for
2167                                                 * now */);
2168                         if (IS_ERR(clerq))
2169                                 GOTO(out, req = (void *)clerq);
2170                         lock = oap->oap_ldlm_lock;
2171                 }
2172                 pga[i] = &oap->oap_brw_page;
2173                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2174                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2175                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2176                 i++;
2177                 cl_req_page_add(env, clerq, page);
2178         }
2179
2180         /* always get the data for the obdo for the rpc */
2181         LASSERT(ops != NULL);
2182         crattr.cra_oa = oa;
2183         crattr.cra_capa = NULL;
2184         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2185         if (lock) {
2186                 oa->o_handle = lock->l_remote_handle;
2187                 oa->o_valid |= OBD_MD_FLHANDLE;
2188         }
2189
2190         rc = cl_req_prep(env, clerq);
2191         if (rc != 0) {
2192                 CERROR("cl_req_prep failed: %d\n", rc);
2193                 GOTO(out, req = ERR_PTR(rc));
2194         }
2195
2196         sort_brw_pages(pga, page_count);
2197         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2198                                   pga, &req, crattr.cra_capa, 1);
2199         if (rc != 0) {
2200                 CERROR("prep_req failed: %d\n", rc);
2201                 GOTO(out, req = ERR_PTR(rc));
2202         }
2203
2204         /* Need to update the timestamps after the request is built in case
2205          * we race with setattr (locally or in queue at OST).  If OST gets
2206          * later setattr before earlier BRW (as determined by the request xid),
2207          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2208          * way to do this in a single call.  bug 10150 */
2209         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2210         cl_req_attr_set(env, clerq, &crattr,
2211                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2212
2213         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2214         aa = ptlrpc_req_async_args(req);
2215         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2216         list_splice(rpc_list, &aa->aa_oaps);
2217         CFS_INIT_LIST_HEAD(rpc_list);
2218         aa->aa_clerq = clerq;
2219 out:
2220         capa_put(crattr.cra_capa);
2221         if (IS_ERR(req)) {
2222                 if (oa)
2223                         OBDO_FREE(oa);
2224                 if (pga)
2225                         OBD_FREE(pga, sizeof(*pga) * page_count);
2226                 /* this should happen rarely and is pretty bad, it makes the
2227                  * pending list not follow the dirty order */
2228                 client_obd_list_lock(&cli->cl_loi_list_lock);
2229                 list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2230                         list_del_init(&oap->oap_rpc_item);
2231
2232                         /* queued sync pages can be torn down while the pages
2233                          * were between the pending list and the rpc */
2234                         if (oap->oap_interrupted) {
2235                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2236                                 osc_ap_completion(env, cli, NULL, oap, 0,
2237                                                   oap->oap_count);
2238                                 continue;
2239                         }
2240                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2241                 }
2242                 if (clerq && !IS_ERR(clerq))
2243                         cl_req_completion(env, clerq, PTR_ERR(req));
2244         }
2245         RETURN(req);
2246 }
2247
2248 /**
2249  * prepare pages for ASYNC io and put pages in send queue.
2250  *
2251  * \param cli -
2252  * \param loi -
2253  * \param cmd - OBD_BRW_* macroses
2254  * \param lop - pending pages
2255  *
2256  * \return zero if pages successfully add to send queue.
2257  * \return not zere if error occurring.
2258  */
2259 static int
2260 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2261                  struct lov_oinfo *loi,
2262                  int cmd, struct loi_oap_pages *lop)
2263 {
2264         struct ptlrpc_request *req;
2265         obd_count page_count = 0;
2266         struct osc_async_page *oap = NULL, *tmp;
2267         struct osc_brw_async_args *aa;
2268         const struct obd_async_page_ops *ops;
2269         CFS_LIST_HEAD(rpc_list);
2270         unsigned int ending_offset;
2271         unsigned  starting_offset = 0;
2272         int srvlock = 0;
2273         struct cl_object *clob = NULL;
2274         ENTRY;
2275
2276         /* first we find the pages we're allowed to work with */
2277         list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2278                                  oap_pending_item) {
2279                 ops = oap->oap_caller_ops;
2280
2281                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2282                          "magic 0x%x\n", oap, oap->oap_magic);
2283
2284                 if (clob == NULL) {
2285                         /* pin object in memory, so that completion call-backs
2286                          * can be safely called under client_obd_list lock. */
2287                         clob = osc_oap2cl_page(oap)->cp_obj;
2288                         cl_object_get(clob);
2289                 }
2290
2291                 if (page_count != 0 &&
2292                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2293                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2294                                " oap %p, page %p, srvlock %u\n",
2295                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2296                         break;
2297                 }
2298                 /* in llite being 'ready' equates to the page being locked
2299                  * until completion unlocks it.  commit_write submits a page
2300                  * as not ready because its unlock will happen unconditionally
2301                  * as the call returns.  if we race with commit_write giving
2302                  * us that page we dont' want to create a hole in the page
2303                  * stream, so we stop and leave the rpc to be fired by
2304                  * another dirtier or kupdated interval (the not ready page
2305                  * will still be on the dirty list).  we could call in
2306                  * at the end of ll_file_write to process the queue again. */
2307                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2308                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2309                                                     cmd);
2310                         if (rc < 0)
2311                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2312                                                 "instead of ready\n", oap,
2313                                                 oap->oap_page, rc);
2314                         switch (rc) {
2315                         case -EAGAIN:
2316                                 /* llite is telling us that the page is still
2317                                  * in commit_write and that we should try
2318                                  * and put it in an rpc again later.  we
2319                                  * break out of the loop so we don't create
2320                                  * a hole in the sequence of pages in the rpc
2321                                  * stream.*/
2322                                 oap = NULL;
2323                                 break;
2324                         case -EINTR:
2325                                 /* the io isn't needed.. tell the checks
2326                                  * below to complete the rpc with EINTR */
2327                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2328                                 oap->oap_count = -EINTR;
2329                                 break;
2330                         case 0:
2331                                 oap->oap_async_flags |= ASYNC_READY;
2332                                 break;
2333                         default:
2334                                 LASSERTF(0, "oap %p page %p returned %d "
2335                                             "from make_ready\n", oap,
2336                                             oap->oap_page, rc);
2337                                 break;
2338                         }
2339                 }
2340                 if (oap == NULL)
2341                         break;
2342                 /*
2343                  * Page submitted for IO has to be locked. Either by
2344                  * ->ap_make_ready() or by higher layers.
2345                  */
2346 #if defined(__KERNEL__) && defined(__linux__)
2347                 {
2348                         struct cl_page *page;
2349
2350                         page = osc_oap2cl_page(oap);
2351
2352                         if (page->cp_type == CPT_CACHEABLE &&
2353                             !(PageLocked(oap->oap_page) &&
2354                               (CheckWriteback(oap->oap_page, cmd)))) {
2355                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2356                                        oap->oap_page,
2357                                        (long)oap->oap_page->flags,
2358                                        oap->oap_async_flags);
2359                                 LBUG();
2360                         }
2361                 }
2362 #endif
2363                 /* If there is a gap at the start of this page, it can't merge
2364                  * with any previous page, so we'll hand the network a
2365                  * "fragmented" page array that it can't transfer in 1 RDMA */
2366                 if (page_count != 0 && oap->oap_page_off != 0)
2367                         break;
2368
2369                 /* take the page out of our book-keeping */
2370                 list_del_init(&oap->oap_pending_item);
2371                 lop_update_pending(cli, lop, cmd, -1);
2372                 list_del_init(&oap->oap_urgent_item);
2373
2374                 if (page_count == 0)
2375                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2376                                           (PTLRPC_MAX_BRW_SIZE - 1);
2377
2378                 /* ask the caller for the size of the io as the rpc leaves. */
2379                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2380                         oap->oap_count =
2381                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2382                                                       cmd);
2383                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2384                 }
2385                 if (oap->oap_count <= 0) {
2386                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2387                                oap->oap_count);
2388                         osc_ap_completion(env, cli, NULL,
2389                                           oap, 0, oap->oap_count);
2390                         continue;
2391                 }
2392
2393                 /* now put the page back in our accounting */
2394                 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2395                 if (page_count == 0)
2396                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2397                 if (++page_count >= cli->cl_max_pages_per_rpc)
2398                         break;
2399
2400                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2401                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2402                  * have the same alignment as the initial writes that allocated
2403                  * extents on the server. */
2404                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2405                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2406                 if (ending_offset == 0)
2407                         break;
2408
2409                 /* If there is a gap at the end of this page, it can't merge
2410                  * with any subsequent pages, so we'll hand the network a
2411                  * "fragmented" page array that it can't transfer in 1 RDMA */
2412                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2413                         break;
2414         }
2415
2416         osc_wake_cache_waiters(cli);
2417
2418         loi_list_maint(cli, loi);
2419
2420         client_obd_list_unlock(&cli->cl_loi_list_lock);
2421
2422         if (clob != NULL)
2423                 cl_object_put(env, clob);
2424
2425         if (page_count == 0) {
2426                 client_obd_list_lock(&cli->cl_loi_list_lock);
2427                 RETURN(0);
2428         }
2429
2430         req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
2431         if (IS_ERR(req)) {
2432                 LASSERT(list_empty(&rpc_list));
2433                 loi_list_maint(cli, loi);
2434                 RETURN(PTR_ERR(req));
2435         }
2436
2437         aa = ptlrpc_req_async_args(req);
2438
2439         if (cmd == OBD_BRW_READ) {
2440                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2441                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2442                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2443                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2444         } else {
2445                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2446                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2447                                  cli->cl_w_in_flight);
2448                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2449                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2450         }
2451         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2452
2453         client_obd_list_lock(&cli->cl_loi_list_lock);
2454
2455         if (cmd == OBD_BRW_READ)
2456                 cli->cl_r_in_flight++;
2457         else
2458                 cli->cl_w_in_flight++;
2459
2460         /* queued sync pages can be torn down while the pages
2461          * were between the pending list and the rpc */
2462         tmp = NULL;
2463         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2464                 /* only one oap gets a request reference */
2465                 if (tmp == NULL)
2466                         tmp = oap;
2467                 if (oap->oap_interrupted && !req->rq_intr) {
2468                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2469                                oap, req);
2470                         ptlrpc_mark_interrupted(req);
2471                 }
2472         }
2473         if (tmp != NULL)
2474                 tmp->oap_request = ptlrpc_request_addref(req);
2475
2476         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2477                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2478
2479         req->rq_interpret_reply = brw_interpret;
2480         ptlrpcd_add_req(req, PSCOPE_BRW);
2481         RETURN(1);
2482 }
2483
2484 #define LOI_DEBUG(LOI, STR, args...)                                     \
2485         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2486                !list_empty(&(LOI)->loi_cli_item),                        \
2487                (LOI)->loi_write_lop.lop_num_pending,                     \
2488                !list_empty(&(LOI)->loi_write_lop.lop_urgent),            \
2489                (LOI)->loi_read_lop.lop_num_pending,                      \
2490                !list_empty(&(LOI)->loi_read_lop.lop_urgent),             \
2491                args)                                                     \
2492
2493 /* This is called by osc_check_rpcs() to find which objects have pages that
2494  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2495 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2496 {
2497         ENTRY;
2498         /* first return all objects which we already know to have
2499          * pages ready to be stuffed into rpcs */
2500         if (!list_empty(&cli->cl_loi_ready_list))
2501                 RETURN(list_entry(cli->cl_loi_ready_list.next,
2502                                   struct lov_oinfo, loi_cli_item));
2503
2504         /* then if we have cache waiters, return all objects with queued
2505          * writes.  This is especially important when many small files
2506          * have filled up the cache and not been fired into rpcs because
2507          * they don't pass the nr_pending/object threshhold */
2508         if (!list_empty(&cli->cl_cache_waiters) &&
2509             !list_empty(&cli->cl_loi_write_list))
2510                 RETURN(list_entry(cli->cl_loi_write_list.next,
2511                                   struct lov_oinfo, loi_write_item));
2512
2513         /* then return all queued objects when we have an invalid import
2514          * so that they get flushed */
2515         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2516                 if (!list_empty(&cli->cl_loi_write_list))
2517                         RETURN(list_entry(cli->cl_loi_write_list.next,
2518                                           struct lov_oinfo, loi_write_item));
2519                 if (!list_empty(&cli->cl_loi_read_list))
2520                         RETURN(list_entry(cli->cl_loi_read_list.next,
2521                                           struct lov_oinfo, loi_read_item));
2522         }
2523         RETURN(NULL);
2524 }
2525
2526 /* called with the loi list lock held */
2527 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2528 {
2529         struct lov_oinfo *loi;
2530         int rc = 0, race_counter = 0;
2531         ENTRY;
2532
2533         while ((loi = osc_next_loi(cli)) != NULL) {
2534                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2535
2536                 if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)
2537                         break;
2538
2539                 /* attempt some read/write balancing by alternating between
2540                  * reads and writes in an object.  The makes_rpc checks here
2541                  * would be redundant if we were getting read/write work items
2542                  * instead of objects.  we don't want send_oap_rpc to drain a
2543                  * partial read pending queue when we're given this object to
2544                  * do io on writes while there are cache waiters */
2545                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2546                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2547                                               &loi->loi_write_lop);
2548                         if (rc < 0)
2549                                 break;
2550                         if (rc > 0)
2551                                 race_counter = 0;
2552                         else
2553                                 race_counter++;
2554                 }
2555                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2556                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2557                                               &loi->loi_read_lop);
2558                         if (rc < 0)
2559                                 break;
2560                         if (rc > 0)
2561                                 race_counter = 0;
2562                         else
2563                                 race_counter++;
2564                 }
2565
2566                 /* attempt some inter-object balancing by issueing rpcs
2567                  * for each object in turn */
2568                 if (!list_empty(&loi->loi_cli_item))
2569                         list_del_init(&loi->loi_cli_item);
2570                 if (!list_empty(&loi->loi_write_item))
2571                         list_del_init(&loi->loi_write_item);
2572                 if (!list_empty(&loi->loi_read_item))
2573                         list_del_init(&loi->loi_read_item);
2574
2575                 loi_list_maint(cli, loi);
2576
2577                 /* send_oap_rpc fails with 0 when make_ready tells it to
2578                  * back off.  llite's make_ready does this when it tries
2579                  * to lock a page queued for write that is already locked.
2580                  * we want to try sending rpcs from many objects, but we
2581                  * don't want to spin failing with 0.  */
2582                 if (race_counter == 10)
2583                         break;
2584         }
2585         EXIT;
2586 }
2587
2588 /* we're trying to queue a page in the osc so we're subject to the
2589  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2590  * If the osc's queued pages are already at that limit, then we want to sleep
2591  * until there is space in the osc's queue for us.  We also may be waiting for
2592  * write credits from the OST if there are RPCs in flight that may return some
2593  * before we fall back to sync writes.
2594  *
2595  * We need this know our allocation was granted in the presence of signals */
2596 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2597 {
2598         int rc;
2599         ENTRY;
2600         client_obd_list_lock(&cli->cl_loi_list_lock);
2601         rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2602         client_obd_list_unlock(&cli->cl_loi_list_lock);
2603         RETURN(rc);
2604 };
2605
2606 /**
2607  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2608  * is available.
2609  */
2610 int osc_enter_cache_try(const struct lu_env *env,
2611                         struct client_obd *cli, struct lov_oinfo *loi,
2612                         struct osc_async_page *oap, int transient)
2613 {
2614         int has_grant;
2615
2616         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2617         if (has_grant) {
2618                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2619                 if (transient) {
2620                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2621                         atomic_inc(&obd_dirty_transit_pages);
2622                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2623                 }
2624         }
2625         return has_grant;
2626 }
2627
2628 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2629  * grant or cache space. */
2630 static int osc_enter_cache(const struct lu_env *env,
2631                            struct client_obd *cli, struct lov_oinfo *loi,
2632                            struct osc_async_page *oap)
2633 {
2634         struct osc_cache_waiter ocw;
2635         struct l_wait_info lwi = { 0 };
2636
2637         ENTRY;
2638
2639         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2640                "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
2641                cli->cl_dirty_max, obd_max_dirty_pages,
2642                cli->cl_lost_grant, cli->cl_avail_grant);
2643
2644         /* force the caller to try sync io.  this can jump the list
2645          * of queued writes and create a discontiguous rpc stream */
2646         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2647             loi->loi_ar.ar_force_sync)
2648                 RETURN(-EDQUOT);
2649
2650         /* Hopefully normal case - cache space and write credits available */
2651         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2652             atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2653             osc_enter_cache_try(env, cli, loi, oap, 0))
2654                 RETURN(0);
2655
2656         /* Make sure that there are write rpcs in flight to wait for.  This
2657          * is a little silly as this object may not have any pending but
2658          * other objects sure might. */
2659         if (cli->cl_w_in_flight) {
2660                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2661                 cfs_waitq_init(&ocw.ocw_waitq);
2662                 ocw.ocw_oap = oap;
2663                 ocw.ocw_rc = 0;
2664
2665                 loi_list_maint(cli, loi);
2666                 osc_check_rpcs(env, cli);
2667                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2668
2669                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2670                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2671
2672                 client_obd_list_lock(&cli->cl_loi_list_lock);
2673                 if (!list_empty(&ocw.ocw_entry)) {
2674                         list_del(&ocw.ocw_entry);
2675                         RETURN(-EINTR);
2676                 }
2677                 RETURN(ocw.ocw_rc);
2678         }
2679
2680         RETURN(-EDQUOT);
2681 }
2682
2683
2684 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2685                         struct lov_oinfo *loi, cfs_page_t *page,
2686                         obd_off offset, const struct obd_async_page_ops *ops,
2687                         void *data, void **res, int nocache,
2688                         struct lustre_handle *lockh)
2689 {
2690         struct osc_async_page *oap;
2691
2692         ENTRY;
2693
2694         if (!page)
2695                 return size_round(sizeof(*oap));
2696
2697         oap = *res;
2698         oap->oap_magic = OAP_MAGIC;
2699         oap->oap_cli = &exp->exp_obd->u.cli;
2700         oap->oap_loi = loi;
2701
2702         oap->oap_caller_ops = ops;
2703         oap->oap_caller_data = data;
2704
2705         oap->oap_page = page;
2706         oap->oap_obj_off = offset;
2707         if (!client_is_remote(exp) &&
2708             cfs_capable(CFS_CAP_SYS_RESOURCE))
2709                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2710
2711         LASSERT(!(offset & ~CFS_PAGE_MASK));
2712
2713         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2714         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2715         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2716         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2717
2718         spin_lock_init(&oap->oap_lock);
2719         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2720         RETURN(0);
2721 }
2722
2723 struct osc_async_page *oap_from_cookie(void *cookie)
2724 {
2725         struct osc_async_page *oap = cookie;
2726         if (oap->oap_magic != OAP_MAGIC)
2727                 return ERR_PTR(-EINVAL);
2728         return oap;
2729 };
2730
2731 int osc_queue_async_io(const struct lu_env *env,
2732                        struct obd_export *exp, struct lov_stripe_md *lsm,
2733                        struct lov_oinfo *loi, void *cookie,
2734                        int cmd, obd_off off, int count,
2735                        obd_flag brw_flags, enum async_flags async_flags)
2736 {
2737         struct client_obd *cli = &exp->exp_obd->u.cli;
2738         struct osc_async_page *oap;
2739         int rc = 0;
2740         ENTRY;
2741
2742         oap = oap_from_cookie(cookie);
2743         if (IS_ERR(oap))
2744                 RETURN(PTR_ERR(oap));
2745
2746         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2747                 RETURN(-EIO);
2748
2749         if (!list_empty(&oap->oap_pending_item) ||
2750             !list_empty(&oap->oap_urgent_item) ||
2751             !list_empty(&oap->oap_rpc_item))
2752                 RETURN(-EBUSY);
2753
2754         /* check if the file's owner/group is over quota */
2755         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2756                 struct cl_object *obj;
2757                 struct cl_attr    attr; /* XXX put attr into thread info */
2758
2759                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2760
2761                 cl_object_attr_lock(obj);
2762                 rc = cl_object_attr_get(env, obj, &attr);
2763                 cl_object_attr_unlock(obj);
2764
2765                 if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
2766                                             attr.cat_gid) == NO_QUOTA)
2767                         rc = -EDQUOT;
2768                 if (rc)
2769                         RETURN(rc);
2770         }
2771
2772         if (loi == NULL)
2773                 loi = lsm->lsm_oinfo[0];
2774
2775         client_obd_list_lock(&cli->cl_loi_list_lock);
2776
2777         LASSERT(off + count <= CFS_PAGE_SIZE);
2778         oap->oap_cmd = cmd;
2779         oap->oap_page_off = off;
2780         oap->oap_count = count;
2781         oap->oap_brw_flags = brw_flags;
2782         oap->oap_async_flags = async_flags;
2783
2784         if (cmd & OBD_BRW_WRITE) {
2785                 rc = osc_enter_cache(env, cli, loi, oap);
2786                 if (rc) {
2787                         client_obd_list_unlock(&cli->cl_loi_list_lock);
2788                         RETURN(rc);
2789                 }
2790         }
2791
2792         osc_oap_to_pending(oap);
2793         loi_list_maint(cli, loi);
2794
2795         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
2796                   cmd);
2797
2798         osc_check_rpcs(env, cli);
2799         client_obd_list_unlock(&cli->cl_loi_list_lock);
2800
2801         RETURN(0);
2802 }
2803
2804 /* aka (~was & now & flag), but this is more clear :) */
2805 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
2806
2807 int osc_set_async_flags_base(struct client_obd *cli,
2808                              struct lov_oinfo *loi, struct osc_async_page *oap,
2809                              obd_flag async_flags)
2810 {
2811         struct loi_oap_pages *lop;
2812         ENTRY;
2813
2814         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2815                 RETURN(-EIO);
2816
2817         if (oap->oap_cmd & OBD_BRW_WRITE) {
2818                 lop = &loi->loi_write_lop;
2819         } else {
2820                 lop = &loi->loi_read_lop;
2821         }
2822
2823         if (list_empty(&oap->oap_pending_item))
2824                 RETURN(-EINVAL);
2825
2826         if ((oap->oap_async_flags & async_flags) == async_flags)
2827                 RETURN(0);
2828
2829         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
2830                 oap->oap_async_flags |= ASYNC_READY;
2831
2832         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT)) {
2833                 if (list_empty(&oap->oap_rpc_item)) {
2834                         list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2835                         loi_list_maint(cli, loi);
2836                 }
2837         }
2838
2839         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
2840                         oap->oap_async_flags);
2841         RETURN(0);
2842 }
2843
2844 int osc_teardown_async_page(struct obd_export *exp,
2845                             struct lov_stripe_md *lsm,
2846                             struct lov_oinfo *loi, void *cookie)
2847 {
2848         struct client_obd *cli = &exp->exp_obd->u.cli;
2849         struct loi_oap_pages *lop;
2850         struct osc_async_page *oap;
2851         int rc = 0;
2852         ENTRY;
2853
2854         oap = oap_from_cookie(cookie);
2855         if (IS_ERR(oap))
2856                 RETURN(PTR_ERR(oap));
2857
2858         if (loi == NULL)
2859                 loi = lsm->lsm_oinfo[0];
2860
2861         if (oap->oap_cmd & OBD_BRW_WRITE) {
2862                 lop = &loi->loi_write_lop;
2863         } else {
2864                 lop = &loi->loi_read_lop;
2865         }
2866
2867         client_obd_list_lock(&cli->cl_loi_list_lock);
2868
2869         if (!list_empty(&oap->oap_rpc_item))
2870                 GOTO(out, rc = -EBUSY);
2871
2872         osc_exit_cache(cli, oap, 0);
2873         osc_wake_cache_waiters(cli);
2874
2875         if (!list_empty(&oap->oap_urgent_item)) {
2876                 list_del_init(&oap->oap_urgent_item);
2877                 oap->oap_async_flags &= ~ASYNC_URGENT;
2878         }
2879         if (!list_empty(&oap->oap_pending_item)) {
2880                 list_del_init(&oap->oap_pending_item);
2881                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
2882         }
2883         loi_list_maint(cli, loi);
2884         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
2885 out:
2886         client_obd_list_unlock(&cli->cl_loi_list_lock);
2887         RETURN(rc);
2888 }
2889
2890 static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
2891                                          struct ldlm_enqueue_info *einfo,
2892                                          int flags)
2893 {
2894         void *data = einfo->ei_cbdata;
2895
2896         LASSERT(lock != NULL);
2897         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2898         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2899         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2900         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2901
2902         lock_res_and_lock(lock);
2903         spin_lock(&osc_ast_guard);
2904         LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
2905         lock->l_ast_data = data;
2906         spin_unlock(&osc_ast_guard);
2907         unlock_res_and_lock(lock);
2908 }
2909
2910 static void osc_set_data_with_check(struct lustre_handle *lockh,
2911                                     struct ldlm_enqueue_info *einfo,
2912                                     int flags)
2913 {
2914         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2915
2916         if (lock != NULL) {
2917                 osc_set_lock_data_with_check(lock, einfo, flags);
2918                 LDLM_LOCK_PUT(lock);
2919         } else
2920                 CERROR("lockh %p, data %p - client evicted?\n",
2921                        lockh, einfo->ei_cbdata);
2922 }
2923
2924 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2925                              ldlm_iterator_t replace, void *data)
2926 {
2927         struct ldlm_res_id res_id;
2928         struct obd_device *obd = class_exp2obd(exp);
2929
2930         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
2931         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2932         return 0;
2933 }
2934
2935 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2936                             obd_enqueue_update_f upcall, void *cookie,
2937                             int *flags, int rc)
2938 {
2939         int intent = *flags & LDLM_FL_HAS_INTENT;
2940         ENTRY;
2941
2942         if (intent) {
2943                 /* The request was created before ldlm_cli_enqueue call. */
2944                 if (rc == ELDLM_LOCK_ABORTED) {
2945                         struct ldlm_reply *rep;
2946                         rep = req_capsule_server_get(&req->rq_pill,
2947                                                      &RMF_DLM_REP);
2948
2949                         LASSERT(rep != NULL);
2950                         if (rep->lock_policy_res1)
2951                                 rc = rep->lock_policy_res1;
2952                 }
2953         }
2954
2955         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
2956                 *flags |= LDLM_FL_LVB_READY;
2957                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2958                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2959         }
2960
2961         /* Call the update callback. */
2962         rc = (*upcall)(cookie, rc);
2963         RETURN(rc);
2964 }
2965
2966 static int osc_enqueue_interpret(const struct lu_env *env,
2967                                  struct ptlrpc_request *req,
2968                                  struct osc_enqueue_args *aa, int rc)
2969 {
2970         struct ldlm_lock *lock;
2971         struct lustre_handle handle;
2972         __u32 mode;
2973
2974         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2975          * might be freed anytime after lock upcall has been called. */
2976         lustre_handle_copy(&handle, aa->oa_lockh);
2977         mode = aa->oa_ei->ei_mode;
2978
2979         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2980          * be valid. */
2981         lock = ldlm_handle2lock(&handle);
2982
2983         /* Take an additional reference so that a blocking AST that
2984          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2985          * to arrive after an upcall has been executed by
2986          * osc_enqueue_fini(). */
2987         ldlm_lock_addref(&handle, mode);
2988
2989         /* Complete obtaining the lock procedure. */
2990         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2991                                    mode, aa->oa_flags, aa->oa_lvb,
2992                                    sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
2993                                    &handle, rc);
2994         /* Complete osc stuff. */
2995         rc = osc_enqueue_fini(req, aa->oa_lvb,
2996                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
2997         /* Release the lock for async request. */
2998         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2999                 /*
3000                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3001                  * not already released by
3002                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3003                  */
3004                 ldlm_lock_decref(&handle, mode);
3005
3006         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3007                  aa->oa_lockh, req, aa);
3008         ldlm_lock_decref(&handle, mode);
3009         LDLM_LOCK_PUT(lock);
3010         return rc;
3011 }
3012
3013 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3014                         struct lov_oinfo *loi, int flags,
3015                         struct ost_lvb *lvb, __u32 mode, int rc)
3016 {
3017         if (rc == ELDLM_OK) {
3018                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3019                 __u64 tmp;
3020
3021                 LASSERT(lock != NULL);
3022                 loi->loi_lvb = *lvb;
3023                 tmp = loi->loi_lvb.lvb_size;
3024                 /* Extend KMS up to the end of this lock and no further
3025                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3026                 if (tmp > lock->l_policy_data.l_extent.end)
3027                         tmp = lock->l_policy_data.l_extent.end + 1;
3028                 if (tmp >= loi->loi_kms) {
3029                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3030                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3031                         loi_kms_set(loi, tmp);
3032                 } else {
3033                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3034                                    LPU64"; leaving kms="LPU64", end="LPU64,
3035                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3036                                    lock->l_policy_data.l_extent.end);
3037                 }
3038                 ldlm_lock_allow_match(lock);
3039                 LDLM_LOCK_PUT(lock);
3040         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3041                 loi->loi_lvb = *lvb;
3042                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3043                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3044                 rc = ELDLM_OK;
3045         }
3046 }
3047 EXPORT_SYMBOL(osc_update_enqueue);
3048
3049 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3050
3051 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3052  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3053  * other synchronous requests, however keeping some locks and trying to obtain
3054  * others may take a considerable amount of time in a case of ost failure; and
3055  * when other sync requests do not get released lock from a client, the client
3056  * is excluded from the cluster -- such scenarious make the life difficult, so
3057  * release locks just after they are obtained. */
3058 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3059                      int *flags, ldlm_policy_data_t *policy,
3060                      struct ost_lvb *lvb, int kms_valid,
3061                      obd_enqueue_update_f upcall, void *cookie,
3062                      struct ldlm_enqueue_info *einfo,
3063                      struct lustre_handle *lockh,
3064                      struct ptlrpc_request_set *rqset, int async)
3065 {
3066         struct obd_device *obd = exp->exp_obd;
3067         struct ptlrpc_request *req = NULL;
3068         int intent = *flags & LDLM_FL_HAS_INTENT;
3069         ldlm_mode_t mode;
3070         int rc;
3071         ENTRY;
3072
3073         /* Filesystem lock extents are extended to page boundaries so that
3074          * dealing with the page cache is a little smoother.  */
3075         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3076         policy->l_extent.end |= ~CFS_PAGE_MASK;
3077
3078         /*
3079          * kms is not valid when either object is completely fresh (so that no
3080          * locks are cached), or object was evicted. In the latter case cached
3081          * lock cannot be used, because it would prime inode state with
3082          * potentially stale LVB.
3083          */
3084         if (!kms_valid)
3085                 goto no_match;
3086
3087         /* Next, search for already existing extent locks that will cover us */
3088         /* If we're trying to read, we also search for an existing PW lock.  The
3089          * VFS and page cache already protect us locally, so lots of readers/
3090          * writers can share a single PW lock.
3091          *
3092          * There are problems with conversion deadlocks, so instead of
3093          * converting a read lock to a write lock, we'll just enqueue a new
3094          * one.
3095          *
3096          * At some point we should cancel the read lock instead of making them
3097          * send us a blocking callback, but there are problems with canceling
3098          * locks out from other users right now, too. */
3099         mode = einfo->ei_mode;
3100         if (einfo->ei_mode == LCK_PR)
3101                 mode |= LCK_PW;
3102         mode = ldlm_lock_match(obd->obd_namespace,
3103                                *flags | LDLM_FL_LVB_READY, res_id,
3104                                einfo->ei_type, policy, mode, lockh, 0);
3105         if (mode) {
3106                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3107
3108                 if (matched->l_ast_data == NULL ||
3109                     matched->l_ast_data == einfo->ei_cbdata) {
3110                         /* addref the lock only if not async requests and PW
3111                          * lock is matched whereas we asked for PR. */
3112                         if (!rqset && einfo->ei_mode != mode)
3113                                 ldlm_lock_addref(lockh, LCK_PR);
3114                         osc_set_lock_data_with_check(matched, einfo, *flags);
3115                         if (intent) {
3116                                 /* I would like to be able to ASSERT here that
3117                                  * rss <= kms, but I can't, for reasons which
3118                                  * are explained in lov_enqueue() */
3119                         }
3120
3121                         /* We already have a lock, and it's referenced */
3122                         (*upcall)(cookie, ELDLM_OK);
3123
3124                         /* For async requests, decref the lock. */
3125                         if (einfo->ei_mode != mode)
3126                                 ldlm_lock_decref(lockh, LCK_PW);
3127                         else if (rqset)
3128                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3129                         LDLM_LOCK_PUT(matched);
3130                         RETURN(ELDLM_OK);
3131                 } else
3132                         ldlm_lock_decref(lockh, mode);
3133                 LDLM_LOCK_PUT(matched);
3134         }
3135
3136  no_match:
3137         if (intent) {
3138                 CFS_LIST_HEAD(cancels);
3139                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3140                                            &RQF_LDLM_ENQUEUE_LVB);
3141                 if (req == NULL)
3142                         RETURN(-ENOMEM);
3143
3144                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3145                 if (rc)
3146                         RETURN(rc);
3147
3148                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3149                                      sizeof *lvb);
3150                 ptlrpc_request_set_replen(req);
3151         }
3152
3153         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3154         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3155
3156         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3157                               sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
3158         if (rqset) {
3159                 if (!rc) {
3160                         struct osc_enqueue_args *aa;
3161                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3162                         aa = ptlrpc_req_async_args(req);
3163                         aa->oa_ei = einfo;
3164                         aa->oa_exp = exp;
3165                         aa->oa_flags  = flags;
3166                         aa->oa_upcall = upcall;
3167                         aa->oa_cookie = cookie;
3168                         aa->oa_lvb    = lvb;
3169                         aa->oa_lockh  = lockh;
3170
3171                         req->rq_interpret_reply =
3172                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3173                         if (rqset == PTLRPCD_SET)
3174                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3175                         else
3176                                 ptlrpc_set_add_req(rqset, req);
3177                 } else if (intent) {
3178                         ptlrpc_req_finished(req);
3179                 }
3180                 RETURN(rc);
3181         }
3182
3183         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3184         if (intent)
3185                 ptlrpc_req_finished(req);
3186
3187         RETURN(rc);
3188 }
3189
3190 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3191                        struct ldlm_enqueue_info *einfo,
3192                        struct ptlrpc_request_set *rqset)
3193 {
3194         struct ldlm_res_id res_id;
3195         int rc;
3196         ENTRY;
3197
3198         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3199                            oinfo->oi_md->lsm_object_gr, &res_id);
3200
3201         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3202                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3203                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3204                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3205                               rqset, rqset != NULL);
3206         RETURN(rc);
3207 }
3208
3209 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3210                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3211                    int *flags, void *data, struct lustre_handle *lockh,
3212                    int unref)
3213 {
3214         struct obd_device *obd = exp->exp_obd;
3215         int lflags = *flags;
3216         ldlm_mode_t rc;
3217         ENTRY;
3218
3219         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3220                 RETURN(-EIO);
3221
3222         /* Filesystem lock extents are extended to page boundaries so that
3223          * dealing with the page cache is a little smoother */
3224         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3225         policy->l_extent.end |= ~CFS_PAGE_MASK;
3226
3227         /* Next, search for already existing extent locks that will cover us */
3228         /* If we're trying to read, we also search for an existing PW lock.  The
3229          * VFS and page cache already protect us locally, so lots of readers/
3230          * writers can share a single PW lock. */
3231         rc = mode;
3232         if (mode == LCK_PR)
3233                 rc |= LCK_PW;
3234         rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
3235                              res_id, type, policy, rc, lockh, unref);
3236         if (rc) {
3237                 if (data != NULL)
3238                         osc_set_data_with_check(lockh, data, lflags);
3239                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3240                         ldlm_lock_addref(lockh, LCK_PR);
3241                         ldlm_lock_decref(lockh, LCK_PW);
3242                 }
3243                 RETURN(rc);
3244         }
3245         RETURN(rc);
3246 }
3247
3248 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3249 {
3250         ENTRY;
3251
3252         if (unlikely(mode == LCK_GROUP))
3253                 ldlm_lock_decref_and_cancel(lockh, mode);
3254         else
3255                 ldlm_lock_decref(lockh, mode);
3256
3257         RETURN(0);
3258 }
3259
3260 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3261                       __u32 mode, struct lustre_handle *lockh)
3262 {
3263         ENTRY;
3264         RETURN(osc_cancel_base(lockh, mode));
3265 }
3266
3267 static int osc_cancel_unused(struct obd_export *exp,
3268                              struct lov_stripe_md *lsm, int flags,
3269                              void *opaque)
3270 {
3271         struct obd_device *obd = class_exp2obd(exp);
3272         struct ldlm_res_id res_id, *resp = NULL;
3273
3274         if (lsm != NULL) {
3275                 resp = osc_build_res_name(lsm->lsm_object_id,
3276                                           lsm->lsm_object_gr, &res_id);
3277         }
3278
3279         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3280 }
3281
3282 static int osc_statfs_interpret(const struct lu_env *env,
3283                                 struct ptlrpc_request *req,
3284                                 struct osc_async_args *aa, int rc)
3285 {
3286         struct obd_statfs *msfs;
3287         ENTRY;
3288
3289         if (rc != 0)
3290                 GOTO(out, rc);
3291
3292         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3293         if (msfs == NULL) {
3294                 GOTO(out, rc = -EPROTO);
3295         }
3296
3297         *aa->aa_oi->oi_osfs = *msfs;
3298 out:
3299         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3300         RETURN(rc);
3301 }
3302
3303 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3304                             __u64 max_age, struct ptlrpc_request_set *rqset)
3305 {
3306         struct ptlrpc_request *req;
3307         struct osc_async_args *aa;
3308         int                    rc;
3309         ENTRY;
3310
3311         /* We could possibly pass max_age in the request (as an absolute
3312          * timestamp or a "seconds.usec ago") so the target can avoid doing
3313          * extra calls into the filesystem if that isn't necessary (e.g.
3314          * during mount that would help a bit).  Having relative timestamps
3315          * is not so great if request processing is slow, while absolute
3316          * timestamps are not ideal because they need time synchronization. */
3317         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3318         if (req == NULL)
3319                 RETURN(-ENOMEM);
3320
3321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3322         if (rc) {
3323                 ptlrpc_request_free(req);
3324                 RETURN(rc);
3325         }
3326         ptlrpc_request_set_replen(req);
3327         req->rq_request_portal = OST_CREATE_PORTAL;
3328         ptlrpc_at_set_req_timeout(req);
3329
3330         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3331                 /* procfs requests not want stat in wait for avoid deadlock */
3332                 req->rq_no_resend = 1;
3333                 req->rq_no_delay = 1;
3334         }
3335
3336         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3337         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3338         aa = ptlrpc_req_async_args(req);
3339         aa->aa_oi = oinfo;
3340
3341         ptlrpc_set_add_req(rqset, req);
3342         RETURN(0);
3343 }
3344
3345 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3346                       __u64 max_age, __u32 flags)
3347 {
3348         struct obd_statfs     *msfs;
3349         struct ptlrpc_request *req;
3350         struct obd_import     *imp = NULL;
3351         int rc;
3352         ENTRY;
3353
3354         /*Since the request might also come from lprocfs, so we need
3355          *sync this with client_disconnect_export Bug15684*/
3356         down_read(&obd->u.cli.cl_sem);
3357         if (obd->u.cli.cl_import)
3358                 imp = class_import_get(obd->u.cli.cl_import);
3359         up_read(&obd->u.cli.cl_sem);
3360         if (!imp)
3361                 RETURN(-ENODEV);
3362
3363         /* We could possibly pass max_age in the request (as an absolute
3364          * timestamp or a "seconds.usec ago") so the target can avoid doing
3365          * extra calls into the filesystem if that isn't necessary (e.g.
3366          * during mount that would help a bit).  Having relative timestamps
3367          * is not so great if request processing is slow, while absolute
3368          * timestamps are not ideal because they need time synchronization. */
3369         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3370
3371         class_import_put(imp);
3372
3373         if (req == NULL)
3374                 RETURN(-ENOMEM);
3375
3376         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3377         if (rc) {
3378                 ptlrpc_request_free(req);
3379                 RETURN(rc);
3380         }
3381         ptlrpc_request_set_replen(req);
3382         req->rq_request_portal = OST_CREATE_PORTAL;
3383         ptlrpc_at_set_req_timeout(req);
3384
3385         if (flags & OBD_STATFS_NODELAY) {
3386                 /* procfs requests not want stat in wait for avoid deadlock */
3387                 req->rq_no_resend = 1;
3388                 req->rq_no_delay = 1;
3389         }
3390
3391         rc = ptlrpc_queue_wait(req);
3392         if (rc)
3393                 GOTO(out, rc);
3394
3395         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3396         if (msfs == NULL) {
3397                 GOTO(out, rc = -EPROTO);
3398         }
3399
3400         *osfs = *msfs;
3401
3402         EXIT;
3403  out:
3404         ptlrpc_req_finished(req);
3405         return rc;
3406 }
3407
3408 /* Retrieve object striping information.
3409  *
3410  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3411  * the maximum number of OST indices which will fit in the user buffer.
3412  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3413  */
3414 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3415 {
3416         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3417         struct lov_user_md_v3 lum, *lumk;
3418         struct lov_user_ost_data_v1 *lmm_objects;
3419         int rc = 0, lum_size;
3420         ENTRY;
3421
3422         if (!lsm)
3423                 RETURN(-ENODATA);
3424
3425         /* we only need the header part from user space to get lmm_magic and
3426          * lmm_stripe_count, (the header part is common to v1 and v3) */
3427         lum_size = sizeof(struct lov_user_md_v1);
3428         if (copy_from_user(&lum, lump, lum_size))
3429                 RETURN(-EFAULT);
3430
3431         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3432             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3433                 RETURN(-EINVAL);
3434
3435         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3436         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3437         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3438         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3439
3440         /* we can use lov_mds_md_size() to compute lum_size
3441          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3442         if (lum.lmm_stripe_count > 0) {
3443                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3444                 OBD_ALLOC(lumk, lum_size);
3445                 if (!lumk)
3446                         RETURN(-ENOMEM);
3447
3448                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3449                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3450                 else
3451                         lmm_objects = &(lumk->lmm_objects[0]);
3452                 lmm_objects->l_object_id = lsm->lsm_object_id;
3453         } else {
3454                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3455                 lumk = &lum;
3456         }
3457
3458         lumk->lmm_object_id = lsm->lsm_object_id;
3459         lumk->lmm_object_gr = lsm->lsm_object_gr;
3460         lumk->lmm_stripe_count = 1;
3461
3462         if (copy_to_user(lump, lumk, lum_size))
3463                 rc = -EFAULT;
3464
3465         if (lumk != &lum)
3466                 OBD_FREE(lumk, lum_size);
3467
3468         RETURN(rc);
3469 }
3470
3471
3472 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3473                          void *karg, void *uarg)
3474 {
3475         struct obd_device *obd = exp->exp_obd;
3476         struct obd_ioctl_data *data = karg;
3477         int err = 0;
3478         ENTRY;
3479
3480         if (!try_module_get(THIS_MODULE)) {
3481                 CERROR("Can't get module. Is it alive?");
3482                 return -EINVAL;
3483         }
3484         switch (cmd) {
3485         case OBD_IOC_LOV_GET_CONFIG: {
3486                 char *buf;
3487                 struct lov_desc *desc;
3488                 struct obd_uuid uuid;
3489
3490                 buf = NULL;
3491                 len = 0;
3492                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3493                         GOTO(out, err = -EINVAL);
3494
3495                 data = (struct obd_ioctl_data *)buf;
3496
3497                 if (sizeof(*desc) > data->ioc_inllen1) {
3498                         obd_ioctl_freedata(buf, len);
3499                         GOTO(out, err = -EINVAL);
3500                 }
3501
3502                 if (data->ioc_inllen2 < sizeof(uuid)) {
3503                         obd_ioctl_freedata(buf, len);
3504                         GOTO(out, err = -EINVAL);
3505                 }
3506
3507                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3508                 desc->ld_tgt_count = 1;
3509                 desc->ld_active_tgt_count = 1;
3510                 desc->ld_default_stripe_count = 1;
3511                 desc->ld_default_stripe_size = 0;
3512                 desc->ld_default_stripe_offset = 0;
3513                 desc->ld_pattern = 0;
3514                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3515
3516                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3517
3518                 err = copy_to_user((void *)uarg, buf, len);
3519                 if (err)
3520                         err = -EFAULT;
3521                 obd_ioctl_freedata(buf, len);
3522                 GOTO(out, err);
3523         }
3524         case LL_IOC_LOV_SETSTRIPE:
3525                 err = obd_alloc_memmd(exp, karg);
3526                 if (err > 0)
3527                         err = 0;
3528                 GOTO(out, err);
3529         case LL_IOC_LOV_GETSTRIPE:
3530                 err = osc_getstripe(karg, uarg);
3531                 GOTO(out, err);
3532         case OBD_IOC_CLIENT_RECOVER:
3533                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3534                                             data->ioc_inlbuf1);
3535                 if (err > 0)
3536                         err = 0;
3537                 GOTO(out, err);
3538         case IOC_OSC_SET_ACTIVE:
3539                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3540                                                data->ioc_offset);
3541                 GOTO(out, err);
3542         case OBD_IOC_POLL_QUOTACHECK:
3543                 err = lquota_poll_check(quota_interface, exp,
3544                                         (struct if_quotacheck *)karg);
3545                 GOTO(out, err);
3546         case OBD_IOC_PING_TARGET:
3547                 err = ptlrpc_obd_ping(obd);
3548                 GOTO(out, err);
3549         default:
3550                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3551                        cmd, cfs_curproc_comm());
3552                 GOTO(out, err = -ENOTTY);
3553         }
3554 out:
3555         module_put(THIS_MODULE);
3556         return err;
3557 }
3558
3559 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3560                         void *key, __u32 *vallen, void *val,
3561                         struct lov_stripe_md *lsm)
3562 {
3563         ENTRY;
3564         if (!vallen || !val)
3565                 RETURN(-EFAULT);
3566
3567         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3568                 __u32 *stripe = val;
3569                 *vallen = sizeof(*stripe);
3570                 *stripe = 0;
3571                 RETURN(0);
3572         } else if (KEY_IS(KEY_LAST_ID)) {
3573                 struct ptlrpc_request *req;
3574                 obd_id                *reply;
3575                 char                  *tmp;
3576                 int                    rc;
3577
3578                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3579                                            &RQF_OST_GET_INFO_LAST_ID);
3580                 if (req == NULL)
3581                         RETURN(-ENOMEM);
3582
3583                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3584                                      RCL_CLIENT, keylen);
3585                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3586                 if (rc) {
3587                         ptlrpc_request_free(req);
3588                         RETURN(rc);
3589                 }
3590
3591                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3592                 memcpy(tmp, key, keylen);
3593
3594                 ptlrpc_request_set_replen(req);
3595                 rc = ptlrpc_queue_wait(req);
3596                 if (rc)
3597                         GOTO(out, rc);
3598
3599                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3600                 if (reply == NULL)
3601                         GOTO(out, rc = -EPROTO);
3602
3603                 *((obd_id *)val) = *reply;
3604         out:
3605                 ptlrpc_req_finished(req);
3606                 RETURN(rc);
3607         } else if (KEY_IS(KEY_FIEMAP)) {
3608                 struct ptlrpc_request *req;
3609                 struct ll_user_fiemap *reply;
3610                 char *tmp;
3611                 int rc;
3612
3613                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3614                                            &RQF_OST_GET_INFO_FIEMAP);
3615                 if (req == NULL)
3616                         RETURN(-ENOMEM);
3617
3618                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3619                                      RCL_CLIENT, keylen);
3620                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3621                                      RCL_CLIENT, *vallen);
3622                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3623                                      RCL_SERVER, *vallen);
3624
3625                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3626                 if (rc) {
3627                         ptlrpc_request_free(req);
3628                         RETURN(rc);
3629                 }
3630
3631                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3632                 memcpy(tmp, key, keylen);
3633                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3634                 memcpy(tmp, val, *vallen);
3635
3636                 ptlrpc_request_set_replen(req);
3637                 rc = ptlrpc_queue_wait(req);
3638                 if (rc)
3639                         GOTO(out1, rc);
3640
3641                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3642                 if (reply == NULL)
3643                         GOTO(out1, rc = -EPROTO);
3644
3645                 memcpy(val, reply, *vallen);
3646         out1:
3647                 ptlrpc_req_finished(req);
3648
3649                 RETURN(rc);
3650         }
3651
3652         RETURN(-EINVAL);
3653 }
3654
3655 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
3656                                           struct ptlrpc_request *req,
3657                                           void *aa, int rc)
3658 {
3659         struct llog_ctxt *ctxt;
3660         struct obd_import *imp = req->rq_import;
3661         ENTRY;
3662
3663         if (rc != 0)
3664                 RETURN(rc);
3665
3666         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
3667         if (ctxt) {
3668                 if (rc == 0)
3669                         rc = llog_initiator_connect(ctxt);
3670                 else
3671                         CERROR("cannot establish connection for "
3672                                "ctxt %p: %d\n", ctxt, rc);
3673         }
3674
3675         llog_ctxt_put(ctxt);
3676         spin_lock(&imp->imp_lock);
3677         imp->imp_server_timeout = 1;
3678         imp->imp_pingable = 1;
3679         spin_unlock(&imp->imp_lock);
3680         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
3681
3682         RETURN(rc);
3683 }
3684
3685 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
3686                               void *key, obd_count vallen, void *val,
3687                               struct ptlrpc_request_set *set)
3688 {
3689         struct ptlrpc_request *req;
3690         struct obd_device     *obd = exp->exp_obd;
3691         struct obd_import     *imp = class_exp2cliimp(exp);
3692         char                  *tmp;
3693         int                    rc;
3694         ENTRY;
3695
3696         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3697
3698         if (KEY_IS(KEY_NEXT_ID)) {
3699                 if (vallen != sizeof(obd_id))
3700                         RETURN(-ERANGE);
3701                 if (val == NULL)
3702                         RETURN(-EINVAL);
3703                 obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
3704                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
3705                        exp->exp_obd->obd_name,
3706                        obd->u.cli.cl_oscc.oscc_next_id);
3707
3708                 RETURN(0);
3709         }
3710
3711         if (KEY_IS(KEY_UNLINKED)) {
3712                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3713                 spin_lock(&oscc->oscc_lock);
3714                 oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3715                 spin_unlock(&oscc->oscc_lock);
3716                 RETURN(0);
3717         }
3718
3719         if (KEY_IS(KEY_INIT_RECOV)) {
3720                 if (vallen != sizeof(int))
3721                         RETURN(-EINVAL);
3722                 spin_lock(&imp->imp_lock);
3723                 imp->imp_initial_recov = *(int *)val;
3724                 spin_unlock(&imp->imp_lock);
3725                 CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
3726                        exp->exp_obd->obd_name,
3727                        imp->imp_initial_recov);
3728                 RETURN(0);
3729         }
3730
3731         if (KEY_IS(KEY_CHECKSUM)) {
3732                 if (vallen != sizeof(int))
3733                         RETURN(-EINVAL);
3734                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3735                 RETURN(0);
3736         }
3737
3738         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3739                 sptlrpc_conf_client_adapt(obd);
3740                 RETURN(0);
3741         }
3742
3743         if (KEY_IS(KEY_FLUSH_CTX)) {
3744                 sptlrpc_import_flush_my_ctx(imp);
3745                 RETURN(0);
3746         }
3747
3748         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3749                 RETURN(-EINVAL);
3750
3751         /* We pass all other commands directly to OST. Since nobody calls osc
3752            methods directly and everybody is supposed to go through LOV, we
3753            assume lov checked invalid values for us.
3754            The only recognised values so far are evict_by_nid and mds_conn.
3755            Even if something bad goes through, we'd get a -EINVAL from OST
3756            anyway. */
3757
3758         if (KEY_IS(KEY_GRANT_SHRINK))  
3759                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO); 
3760         else 
3761                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
3762         
3763         if (req == NULL)
3764                 RETURN(-ENOMEM);
3765
3766         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3767                              RCL_CLIENT, keylen);
3768         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3769                              RCL_CLIENT, vallen);
3770         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3771         if (rc) {
3772                 ptlrpc_request_free(req);
3773                 RETURN(rc);
3774         }
3775
3776         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3777         memcpy(tmp, key, keylen);
3778         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
3779         memcpy(tmp, val, vallen);
3780
3781         if (KEY_IS(KEY_MDS_CONN)) {
3782                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3783
3784                 oscc->oscc_oa.o_gr = (*(__u32 *)val);
3785                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
3786                 LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
3787                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
3788         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
3789                 struct osc_grant_args *aa;
3790                 struct obdo *oa;
3791
3792                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3793                 aa = ptlrpc_req_async_args(req);
3794                 OBD_ALLOC_PTR(oa);
3795                 if (!oa) {
3796                         ptlrpc_req_finished(req);
3797                         RETURN(-ENOMEM);
3798                 }
3799                 *oa = ((struct ost_body *)val)->oa;
3800                 aa->aa_oa = oa;
3801                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3802         }
3803         
3804         ptlrpc_request_set_replen(req);
3805         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3806                 LASSERT(set != NULL);
3807                 ptlrpc_set_add_req(set, req);
3808                 ptlrpc_check_set(NULL, set);
3809         } else 
3810                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3811         
3812         RETURN(0);
3813 }
3814
3815
3816 static struct llog_operations osc_size_repl_logops = {
3817         lop_cancel: llog_obd_repl_cancel
3818 };
3819
3820 static struct llog_operations osc_mds_ost_orig_logops;
3821 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3822                          struct obd_device *tgt, int count,
3823                          struct llog_catid *catid, struct obd_uuid *uuid)
3824 {
3825         int rc;
3826         ENTRY;
3827
3828         LASSERT(olg == &obd->obd_olg);
3829         spin_lock(&obd->obd_dev_lock);
3830         if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
3831                 osc_mds_ost_orig_logops = llog_lvfs_ops;
3832                 osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
3833                 osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
3834                 osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
3835                 osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
3836         }
3837         spin_unlock(&obd->obd_dev_lock);
3838
3839         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
3840                         &catid->lci_logid, &osc_mds_ost_orig_logops);
3841         if (rc) {
3842                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
3843                 GOTO(out, rc);
3844         }
3845
3846         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
3847                         NULL, &osc_size_repl_logops);
3848         if (rc) {
3849                 struct llog_ctxt *ctxt =
3850                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3851                 if (ctxt)
3852                         llog_cleanup(ctxt);
3853                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
3854         }
3855         GOTO(out, rc);
3856 out:
3857         if (rc) {
3858                 CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
3859                        obd->obd_name, tgt->obd_name, count, catid, rc);
3860                 CERROR("logid "LPX64":0x%x\n",
3861                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
3862         }
3863         return rc;
3864 }
3865
3866 static int osc_llog_finish(struct obd_device *obd, int count)
3867 {
3868         struct llog_ctxt *ctxt;
3869         int rc = 0, rc2 = 0;
3870         ENTRY;
3871
3872         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3873         if (ctxt)
3874                 rc = llog_cleanup(ctxt);
3875
3876         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3877         if (ctxt)
3878                 rc2 = llog_cleanup(ctxt);
3879         if (!rc)
3880                 rc = rc2;
3881
3882         RETURN(rc);
3883 }
3884
3885 static int osc_reconnect(const struct lu_env *env,
3886                          struct obd_export *exp, struct obd_device *obd,
3887                          struct obd_uuid *cluuid,
3888                          struct obd_connect_data *data,
3889                          void *localdata)
3890 {
3891         struct client_obd *cli = &obd->u.cli;
3892
3893         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3894                 long lost_grant;
3895
3896                 client_obd_list_lock(&cli->cl_loi_list_lock);
3897                 data->ocd_grant = cli->cl_avail_grant ?:
3898                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3899                 lost_grant = cli->cl_lost_grant;
3900                 cli->cl_lost_grant = 0;
3901                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3902
3903                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
3904                        "cl_lost_grant: %ld\n", data->ocd_grant,
3905                        cli->cl_avail_grant, lost_grant);
3906                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3907                        " ocd_grant: %d\n", data->ocd_connect_flags,
3908                        data->ocd_version, data->ocd_grant);
3909         }
3910
3911         RETURN(0);
3912 }
3913
3914 static int osc_disconnect(struct obd_export *exp)
3915 {
3916         struct obd_device *obd = class_exp2obd(exp);
3917         struct llog_ctxt  *ctxt;
3918         int rc;
3919
3920         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3921         if (ctxt) {
3922                 if (obd->u.cli.cl_conn_count == 1) {
3923                         /* Flush any remaining cancel messages out to the
3924                          * target */
3925                         llog_sync(ctxt, exp);
3926                 }
3927                 llog_ctxt_put(ctxt);
3928         } else {
3929                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3930                        obd);
3931         }
3932
3933         osc_del_shrink_grant(&obd->u.cli);
3934         rc = client_disconnect_export(exp);
3935         return rc;
3936 }
3937
3938 static int osc_import_event(struct obd_device *obd,
3939                             struct obd_import *imp,
3940                             enum obd_import_event event)
3941 {
3942         struct client_obd *cli;
3943         int rc = 0;
3944
3945         ENTRY;
3946         LASSERT(imp->imp_obd == obd);
3947
3948         switch (event) {
3949         case IMP_EVENT_DISCON: {
3950                 /* Only do this on the MDS OSC's */
3951                 if (imp->imp_server_timeout) {
3952                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3953
3954                         spin_lock(&oscc->oscc_lock);
3955                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
3956                         spin_unlock(&oscc->oscc_lock);
3957                 }
3958                 cli = &obd->u.cli;
3959                 client_obd_list_lock(&cli->cl_loi_list_lock);
3960                 cli->cl_avail_grant = 0;
3961                 cli->cl_lost_grant = 0;
3962                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3963                 break;
3964         }
3965         case IMP_EVENT_INACTIVE: {
3966                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3967                 break;
3968         }
3969         case IMP_EVENT_INVALIDATE: {
3970                 struct ldlm_namespace *ns = obd->obd_namespace;
3971                 struct lu_env         *env;
3972                 int                    refcheck;
3973
3974                 env = cl_env_get(&refcheck);
3975                 if (!IS_ERR(env)) {
3976                         /* Reset grants */
3977                         cli = &obd->u.cli;
3978                         client_obd_list_lock(&cli->cl_loi_list_lock);
3979                         /* all pages go to failing rpcs due to the invalid
3980                          * import */
3981                         osc_check_rpcs(env, cli);
3982                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3983
3984                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3985                         cl_env_put(env, &refcheck);
3986                 } else
3987                         rc = PTR_ERR(env);
3988                 break;
3989         }
3990         case IMP_EVENT_ACTIVE: {
3991                 /* Only do this on the MDS OSC's */
3992                 if (imp->imp_server_timeout) {
3993                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
3994
3995                         spin_lock(&oscc->oscc_lock);
3996                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
3997                         spin_unlock(&oscc->oscc_lock);
3998                 }
3999                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4000                 break;
4001         }
4002         case IMP_EVENT_OCD: {
4003                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4004
4005                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4006                         osc_init_grant(&obd->u.cli, ocd);
4007
4008                 /* See bug 7198 */
4009                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4010                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4011
4012                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4013                 break;
4014         }
4015         default:
4016                 CERROR("Unknown import event %d\n", event);
4017                 LBUG();
4018         }
4019         RETURN(rc);
4020 }
4021
4022 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4023 {
4024         int rc;
4025         ENTRY;
4026
4027         ENTRY;
4028         rc = ptlrpcd_addref();
4029         if (rc)
4030                 RETURN(rc);
4031
4032         rc = client_obd_setup(obd, lcfg);
4033         if (rc) {
4034                 ptlrpcd_decref();
4035         } else {
4036                 struct lprocfs_static_vars lvars = { 0 };
4037                 struct client_obd *cli = &obd->u.cli;
4038
4039                 lprocfs_osc_init_vars(&lvars);
4040                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4041                         lproc_osc_attach_seqstat(obd);
4042                         sptlrpc_lprocfs_cliobd_attach(obd);
4043                         ptlrpc_lprocfs_register_obd(obd);
4044                 }
4045
4046                 oscc_init(obd);
4047                 /* We need to allocate a few requests more, because
4048                    brw_interpret tries to create new requests before freeing
4049                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4050                    reserved, but I afraid that might be too much wasted RAM
4051                    in fact, so 2 is just my guess and still should work. */
4052                 cli->cl_import->imp_rq_pool =
4053                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4054                                             OST_MAXREQSIZE,
4055                                             ptlrpc_add_rqs_to_pool);
4056                 
4057                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4058                 sema_init(&cli->cl_grant_sem, 1);
4059         }
4060
4061         RETURN(rc);
4062 }
4063
4064 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4065 {
4066         int rc = 0;
4067         ENTRY;
4068
4069         switch (stage) {
4070         case OBD_CLEANUP_EARLY: {
4071                 struct obd_import *imp;
4072                 imp = obd->u.cli.cl_import;
4073                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4074                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4075                 ptlrpc_deactivate_import(imp);
4076                 spin_lock(&imp->imp_lock);
4077                 imp->imp_pingable = 0;
4078                 spin_unlock(&imp->imp_lock);
4079                 break;
4080         }
4081         case OBD_CLEANUP_EXPORTS: {
4082                 /* If we set up but never connected, the
4083                    client import will not have been cleaned. */
4084                 if (obd->u.cli.cl_import) {
4085                         struct obd_import *imp;
4086                         down_write(&obd->u.cli.cl_sem);
4087                         imp = obd->u.cli.cl_import;
4088                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4089                                obd->obd_name);
4090                         ptlrpc_invalidate_import(imp);
4091                         if (imp->imp_rq_pool) {
4092                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4093                                 imp->imp_rq_pool = NULL;
4094                         }
4095                         class_destroy_import(imp);
4096                         up_write(&obd->u.cli.cl_sem);
4097                         obd->u.cli.cl_import = NULL;
4098                 }
4099                 rc = obd_llog_finish(obd, 0);
4100                 if (rc != 0)
4101                         CERROR("failed to cleanup llogging subsystems\n");
4102                 break;
4103                 }
4104         }
4105         RETURN(rc);
4106 }
4107
4108 int osc_cleanup(struct obd_device *obd)
4109 {
4110         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4111         int rc;
4112
4113         ENTRY;
4114         ptlrpc_lprocfs_unregister_obd(obd);
4115         lprocfs_obd_cleanup(obd);
4116
4117         spin_lock(&oscc->oscc_lock);
4118         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
4119         oscc->oscc_flags |= OSCC_FLAG_EXITING;
4120         spin_unlock(&oscc->oscc_lock);
4121
4122         /* free memory of osc quota cache */
4123         lquota_cleanup(quota_interface, obd);
4124
4125         rc = client_obd_cleanup(obd);
4126
4127         ptlrpcd_decref();
4128         RETURN(rc);
4129 }
4130
4131 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4132 {
4133         struct lprocfs_static_vars lvars = { 0 };
4134         int rc = 0;
4135
4136         lprocfs_osc_init_vars(&lvars);
4137
4138         switch (lcfg->lcfg_command) {
4139         default:
4140                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4141                                               lcfg, obd);
4142                 if (rc > 0)
4143                         rc = 0;
4144                 break;
4145         }
4146
4147         return(rc);
4148 }
4149
4150 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4151 {
4152         return osc_process_config_base(obd, buf);
4153 }
4154
4155 struct obd_ops osc_obd_ops = {
4156         .o_owner                = THIS_MODULE,
4157         .o_setup                = osc_setup,
4158         .o_precleanup           = osc_precleanup,
4159         .o_cleanup              = osc_cleanup,
4160         .o_add_conn             = client_import_add_conn,
4161         .o_del_conn             = client_import_del_conn,
4162         .o_connect              = client_connect_import,
4163         .o_reconnect            = osc_reconnect,
4164         .o_disconnect           = osc_disconnect,
4165         .o_statfs               = osc_statfs,
4166         .o_statfs_async         = osc_statfs_async,
4167         .o_packmd               = osc_packmd,
4168         .o_unpackmd             = osc_unpackmd,
4169         .o_precreate            = osc_precreate,
4170         .o_create               = osc_create,
4171         .o_destroy              = osc_destroy,
4172         .o_getattr              = osc_getattr,
4173         .o_getattr_async        = osc_getattr_async,
4174         .o_setattr              = osc_setattr,
4175         .o_setattr_async        = osc_setattr_async,
4176         .o_brw                  = osc_brw,
4177         .o_punch                = osc_punch,
4178         .o_sync                 = osc_sync,
4179         .o_enqueue              = osc_enqueue,
4180         .o_change_cbdata        = osc_change_cbdata,
4181         .o_cancel               = osc_cancel,
4182         .o_cancel_unused        = osc_cancel_unused,
4183         .o_iocontrol            = osc_iocontrol,
4184         .o_get_info             = osc_get_info,
4185         .o_set_info_async       = osc_set_info_async,
4186         .o_import_event         = osc_import_event,
4187         .o_llog_init            = osc_llog_init,
4188         .o_llog_finish          = osc_llog_finish,
4189         .o_process_config       = osc_process_config,
4190 };
4191
4192 extern struct lu_kmem_descr  osc_caches[];
4193 extern spinlock_t            osc_ast_guard;
4194 extern struct lock_class_key osc_ast_guard_class;
4195
4196 int __init osc_init(void)
4197 {
4198         struct lprocfs_static_vars lvars = { 0 };
4199         int rc;
4200         ENTRY;
4201
4202         /* print an address of _any_ initialized kernel symbol from this
4203          * module, to allow debugging with gdb that doesn't support data
4204          * symbols from modules.*/
4205         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4206
4207         rc = lu_kmem_init(osc_caches);
4208
4209         lprocfs_osc_init_vars(&lvars);
4210
4211         request_module("lquota");
4212         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4213         lquota_init(quota_interface);
4214         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4215
4216         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4217                                  LUSTRE_OSC_NAME, &osc_device_type);
4218         if (rc) {
4219                 if (quota_interface)
4220                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4221                 lu_kmem_fini(osc_caches);
4222                 RETURN(rc);
4223         }
4224
4225         spin_lock_init(&osc_ast_guard);
4226         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4227
4228         RETURN(rc);
4229 }
4230
4231 #ifdef __KERNEL__
4232 static void /*__exit*/ osc_exit(void)
4233 {
4234         lu_device_type_fini(&osc_device_type);
4235
4236         lquota_exit(quota_interface);
4237         if (quota_interface)
4238                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4239
4240         class_unregister_type(LUSTRE_OSC_NAME);
4241         lu_kmem_fini(osc_caches);
4242 }
4243
4244 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4245 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4246 MODULE_LICENSE("GPL");
4247
4248 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4249 #endif