Whamcloud - gitweb
a6dd7cc7863cd9a072d4b9f2d76744f33a406f64
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static quota_interface_t *quota_interface = NULL;
67 extern quota_interface_t osc_quota_interface;
68
69 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
70 static int brw_interpret(const struct lu_env *env,
71                          struct ptlrpc_request *req, void *data, int rc);
72 int osc_cleanup(struct obd_device *obd);
73
74 /* Pack OSC object metadata for disk storage (LE byte order). */
75 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
76                       struct lov_stripe_md *lsm)
77 {
78         int lmm_size;
79         ENTRY;
80
81         lmm_size = sizeof(**lmmp);
82         if (!lmmp)
83                 RETURN(lmm_size);
84
85         if (*lmmp && !lsm) {
86                 OBD_FREE(*lmmp, lmm_size);
87                 *lmmp = NULL;
88                 RETURN(0);
89         }
90
91         if (!*lmmp) {
92                 OBD_ALLOC(*lmmp, lmm_size);
93                 if (!*lmmp)
94                         RETURN(-ENOMEM);
95         }
96
97         if (lsm) {
98                 LASSERT(lsm->lsm_object_id);
99                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
100                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
101                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
102         }
103
104         RETURN(lmm_size);
105 }
106
107 /* Unpack OSC object metadata from disk storage (LE byte order). */
108 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
109                         struct lov_mds_md *lmm, int lmm_bytes)
110 {
111         int lsm_size;
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
160
161         RETURN(lsm_size);
162 }
163
164 static inline void osc_pack_capa(struct ptlrpc_request *req,
165                                  struct ost_body *body, void *capa)
166 {
167         struct obd_capa *oc = (struct obd_capa *)capa;
168         struct lustre_capa *c;
169
170         if (!capa)
171                 return;
172
173         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
174         LASSERT(c);
175         capa_cpy(c, oc);
176         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
177         DEBUG_CAPA(D_SEC, c, "pack");
178 }
179
180 static inline void osc_pack_req_body(struct ptlrpc_request *req,
181                                      struct obd_info *oinfo)
182 {
183         struct ost_body *body;
184
185         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
186         LASSERT(body);
187
188         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
189         osc_pack_capa(req, body, oinfo->oi_capa);
190 }
191
192 static inline void osc_set_capa_size(struct ptlrpc_request *req,
193                                      const struct req_msg_field *field,
194                                      struct obd_capa *oc)
195 {
196         if (oc == NULL)
197                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
198         else
199                 /* it is already calculated as sizeof struct obd_capa */
200                 ;
201 }
202
203 static int osc_getattr_interpret(const struct lu_env *env,
204                                  struct ptlrpc_request *req,
205                                  struct osc_async_args *aa, int rc)
206 {
207         struct ost_body *body;
208         ENTRY;
209
210         if (rc != 0)
211                 GOTO(out, rc);
212
213         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
214         if (body) {
215                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
216                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
217
218                 /* This should really be sent by the OST */
219                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
220                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
221         } else {
222                 CDEBUG(D_INFO, "can't unpack ost_body\n");
223                 rc = -EPROTO;
224                 aa->aa_oi->oi_oa->o_valid = 0;
225         }
226 out:
227         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
228         RETURN(rc);
229 }
230
231 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
232                              struct ptlrpc_request_set *set)
233 {
234         struct ptlrpc_request *req;
235         struct osc_async_args *aa;
236         int                    rc;
237         ENTRY;
238
239         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
240         if (req == NULL)
241                 RETURN(-ENOMEM);
242
243         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
244         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
245         if (rc) {
246                 ptlrpc_request_free(req);
247                 RETURN(rc);
248         }
249
250         osc_pack_req_body(req, oinfo);
251
252         ptlrpc_request_set_replen(req);
253         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
254
255         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
256         aa = ptlrpc_req_async_args(req);
257         aa->aa_oi = oinfo;
258
259         ptlrpc_set_add_req(set, req);
260         RETURN(0);
261 }
262
263 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
264 {
265         struct ptlrpc_request *req;
266         struct ost_body       *body;
267         int                    rc;
268         ENTRY;
269
270         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
271         if (req == NULL)
272                 RETURN(-ENOMEM);
273
274         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
275         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
276         if (rc) {
277                 ptlrpc_request_free(req);
278                 RETURN(rc);
279         }
280
281         osc_pack_req_body(req, oinfo);
282
283         ptlrpc_request_set_replen(req);
284
285         rc = ptlrpc_queue_wait(req);
286         if (rc)
287                 GOTO(out, rc);
288
289         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
290         if (body == NULL)
291                 GOTO(out, rc = -EPROTO);
292
293         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
294         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
295
296         /* This should really be sent by the OST */
297         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
298         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
299
300         EXIT;
301  out:
302         ptlrpc_req_finished(req);
303         return rc;
304 }
305
306 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
307                        struct obd_trans_info *oti)
308 {
309         struct ptlrpc_request *req;
310         struct ost_body       *body;
311         int                    rc;
312         ENTRY;
313
314         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
315
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
317         if (req == NULL)
318                 RETURN(-ENOMEM);
319
320         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
321         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
322         if (rc) {
323                 ptlrpc_request_free(req);
324                 RETURN(rc);
325         }
326
327         osc_pack_req_body(req, oinfo);
328
329         ptlrpc_request_set_replen(req);
330
331         rc = ptlrpc_queue_wait(req);
332         if (rc)
333                 GOTO(out, rc);
334
335         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 GOTO(out, rc = -EPROTO);
338
339         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
340
341         EXIT;
342 out:
343         ptlrpc_req_finished(req);
344         RETURN(rc);
345 }
346
347 static int osc_setattr_interpret(const struct lu_env *env,
348                                  struct ptlrpc_request *req,
349                                  struct osc_setattr_args *sa, int rc)
350 {
351         struct ost_body *body;
352         ENTRY;
353
354         if (rc != 0)
355                 GOTO(out, rc);
356
357         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358         if (body == NULL)
359                 GOTO(out, rc = -EPROTO);
360
361         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
362 out:
363         rc = sa->sa_upcall(sa->sa_cookie, rc);
364         RETURN(rc);
365 }
366
367 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
368                            struct obd_trans_info *oti,
369                            obd_enqueue_update_f upcall, void *cookie,
370                            struct ptlrpc_request_set *rqset)
371 {
372         struct ptlrpc_request   *req;
373         struct osc_setattr_args *sa;
374         int                      rc;
375         ENTRY;
376
377         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
378         if (req == NULL)
379                 RETURN(-ENOMEM);
380
381         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
382         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
383         if (rc) {
384                 ptlrpc_request_free(req);
385                 RETURN(rc);
386         }
387
388         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
389                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
390
391         osc_pack_req_body(req, oinfo);
392
393         ptlrpc_request_set_replen(req);
394
395         /* do mds to ost setattr asynchronously */
396         if (!rqset) {
397                 /* Do not wait for response. */
398                 ptlrpcd_add_req(req, PSCOPE_OTHER);
399         } else {
400                 req->rq_interpret_reply =
401                         (ptlrpc_interpterer_t)osc_setattr_interpret;
402
403                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
404                 sa = ptlrpc_req_async_args(req);
405                 sa->sa_oa = oinfo->oi_oa;
406                 sa->sa_upcall = upcall;
407                 sa->sa_cookie = cookie;
408
409                 if (rqset == PTLRPCD_SET)
410                         ptlrpcd_add_req(req, PSCOPE_OTHER);
411                 else
412                         ptlrpc_set_add_req(rqset, req);
413         }
414
415         RETURN(0);
416 }
417
418 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
419                              struct obd_trans_info *oti,
420                              struct ptlrpc_request_set *rqset)
421 {
422         return osc_setattr_async_base(exp, oinfo, oti,
423                                       oinfo->oi_cb_up, oinfo, rqset);
424 }
425
426 int osc_real_create(struct obd_export *exp, struct obdo *oa,
427                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
428 {
429         struct ptlrpc_request *req;
430         struct ost_body       *body;
431         struct lov_stripe_md  *lsm;
432         int                    rc;
433         ENTRY;
434
435         LASSERT(oa);
436         LASSERT(ea);
437
438         lsm = *ea;
439         if (!lsm) {
440                 rc = obd_alloc_memmd(exp, &lsm);
441                 if (rc < 0)
442                         RETURN(rc);
443         }
444
445         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
446         if (req == NULL)
447                 GOTO(out, rc = -ENOMEM);
448
449         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
450         if (rc) {
451                 ptlrpc_request_free(req);
452                 GOTO(out, rc);
453         }
454
455         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
456         LASSERT(body);
457         lustre_set_wire_obdo(&body->oa, oa);
458
459         ptlrpc_request_set_replen(req);
460
461         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
462             oa->o_flags == OBD_FL_DELORPHAN) {
463                 DEBUG_REQ(D_HA, req,
464                           "delorphan from OST integration");
465                 /* Don't resend the delorphan req */
466                 req->rq_no_resend = req->rq_no_delay = 1;
467         }
468
469         rc = ptlrpc_queue_wait(req);
470         if (rc)
471                 GOTO(out_req, rc);
472
473         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
474         if (body == NULL)
475                 GOTO(out_req, rc = -EPROTO);
476
477         lustre_get_wire_obdo(oa, &body->oa);
478
479         /* This should really be sent by the OST */
480         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
481         oa->o_valid |= OBD_MD_FLBLKSZ;
482
483         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
484          * have valid lsm_oinfo data structs, so don't go touching that.
485          * This needs to be fixed in a big way.
486          */
487         lsm->lsm_object_id = oa->o_id;
488         lsm->lsm_object_seq = oa->o_seq;
489         *ea = lsm;
490
491         if (oti != NULL) {
492                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
493
494                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
495                         if (!oti->oti_logcookies)
496                                 oti_alloc_cookies(oti, 1);
497                         *oti->oti_logcookies = oa->o_lcookie;
498                 }
499         }
500
501         CDEBUG(D_HA, "transno: "LPD64"\n",
502                lustre_msg_get_transno(req->rq_repmsg));
503 out_req:
504         ptlrpc_req_finished(req);
505 out:
506         if (rc && !*ea)
507                 obd_free_memmd(exp, &lsm);
508         RETURN(rc);
509 }
510
511 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
512                    obd_enqueue_update_f upcall, void *cookie,
513                    struct ptlrpc_request_set *rqset)
514 {
515         struct ptlrpc_request   *req;
516         struct osc_setattr_args *sa;
517         struct ost_body         *body;
518         int                      rc;
519         ENTRY;
520
521         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
522         if (req == NULL)
523                 RETURN(-ENOMEM);
524
525         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
526         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
527         if (rc) {
528                 ptlrpc_request_free(req);
529                 RETURN(rc);
530         }
531         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
532         ptlrpc_at_set_req_timeout(req);
533
534         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
535         LASSERT(body);
536         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
537         osc_pack_capa(req, body, oinfo->oi_capa);
538
539         ptlrpc_request_set_replen(req);
540
541
542         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
543         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
544         sa = ptlrpc_req_async_args(req);
545         sa->sa_oa     = oinfo->oi_oa;
546         sa->sa_upcall = upcall;
547         sa->sa_cookie = cookie;
548         if (rqset == PTLRPCD_SET)
549                 ptlrpcd_add_req(req, PSCOPE_OTHER);
550         else
551                 ptlrpc_set_add_req(rqset, req);
552
553         RETURN(0);
554 }
555
556 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
557                      struct obd_trans_info *oti,
558                      struct ptlrpc_request_set *rqset)
559 {
560         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
561         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
562         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
563         return osc_punch_base(exp, oinfo,
564                               oinfo->oi_cb_up, oinfo, rqset);
565 }
566
567 static int osc_sync_interpret(const struct lu_env *env,
568                               struct ptlrpc_request *req,
569                               void *arg, int rc)
570 {
571         struct osc_async_args *aa = arg;
572         struct ost_body *body;
573         ENTRY;
574
575         if (rc)
576                 GOTO(out, rc);
577
578         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
579         if (body == NULL) {
580                 CERROR ("can't unpack ost_body\n");
581                 GOTO(out, rc = -EPROTO);
582         }
583
584         *aa->aa_oi->oi_oa = body->oa;
585 out:
586         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
587         RETURN(rc);
588 }
589
590 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
591                     obd_size start, obd_size end,
592                     struct ptlrpc_request_set *set)
593 {
594         struct ptlrpc_request *req;
595         struct ost_body       *body;
596         struct osc_async_args *aa;
597         int                    rc;
598         ENTRY;
599
600         if (!oinfo->oi_oa) {
601                 CDEBUG(D_INFO, "oa NULL\n");
602                 RETURN(-EINVAL);
603         }
604
605         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
606         if (req == NULL)
607                 RETURN(-ENOMEM);
608
609         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
610         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
611         if (rc) {
612                 ptlrpc_request_free(req);
613                 RETURN(rc);
614         }
615
616         /* overload the size and blocks fields in the oa with start/end */
617         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
618         LASSERT(body);
619         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
620         body->oa.o_size = start;
621         body->oa.o_blocks = end;
622         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
623         osc_pack_capa(req, body, oinfo->oi_capa);
624
625         ptlrpc_request_set_replen(req);
626         req->rq_interpret_reply = osc_sync_interpret;
627
628         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
629         aa = ptlrpc_req_async_args(req);
630         aa->aa_oi = oinfo;
631
632         ptlrpc_set_add_req(set, req);
633         RETURN (0);
634 }
635
636 /* Find and cancel locally locks matched by @mode in the resource found by
637  * @objid. Found locks are added into @cancel list. Returns the amount of
638  * locks added to @cancels list. */
639 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
640                                    cfs_list_t *cancels,
641                                    ldlm_mode_t mode, int lock_flags)
642 {
643         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
644         struct ldlm_res_id res_id;
645         struct ldlm_resource *res;
646         int count;
647         ENTRY;
648
649         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
650         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
651         if (res == NULL)
652                 RETURN(0);
653
654         LDLM_RESOURCE_ADDREF(res);
655         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
656                                            lock_flags, 0, NULL);
657         LDLM_RESOURCE_DELREF(res);
658         ldlm_resource_putref(res);
659         RETURN(count);
660 }
661
662 static int osc_destroy_interpret(const struct lu_env *env,
663                                  struct ptlrpc_request *req, void *data,
664                                  int rc)
665 {
666         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
667
668         cfs_atomic_dec(&cli->cl_destroy_in_flight);
669         cfs_waitq_signal(&cli->cl_destroy_waitq);
670         return 0;
671 }
672
673 static int osc_can_send_destroy(struct client_obd *cli)
674 {
675         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
676             cli->cl_max_rpcs_in_flight) {
677                 /* The destroy request can be sent */
678                 return 1;
679         }
680         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
681             cli->cl_max_rpcs_in_flight) {
682                 /*
683                  * The counter has been modified between the two atomic
684                  * operations.
685                  */
686                 cfs_waitq_signal(&cli->cl_destroy_waitq);
687         }
688         return 0;
689 }
690
691 /* Destroy requests can be async always on the client, and we don't even really
692  * care about the return code since the client cannot do anything at all about
693  * a destroy failure.
694  * When the MDS is unlinking a filename, it saves the file objects into a
695  * recovery llog, and these object records are cancelled when the OST reports
696  * they were destroyed and sync'd to disk (i.e. transaction committed).
697  * If the client dies, or the OST is down when the object should be destroyed,
698  * the records are not cancelled, and when the OST reconnects to the MDS next,
699  * it will retrieve the llog unlink logs and then sends the log cancellation
700  * cookies to the MDS after committing destroy transactions. */
701 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
702                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
703                        struct obd_export *md_export, void *capa)
704 {
705         struct client_obd     *cli = &exp->exp_obd->u.cli;
706         struct ptlrpc_request *req;
707         struct ost_body       *body;
708         CFS_LIST_HEAD(cancels);
709         int rc, count;
710         ENTRY;
711
712         if (!oa) {
713                 CDEBUG(D_INFO, "oa NULL\n");
714                 RETURN(-EINVAL);
715         }
716
717         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
718                                         LDLM_FL_DISCARD_DATA);
719
720         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
721         if (req == NULL) {
722                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
723                 RETURN(-ENOMEM);
724         }
725
726         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
727         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
728                                0, &cancels, count);
729         if (rc) {
730                 ptlrpc_request_free(req);
731                 RETURN(rc);
732         }
733
734         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
735         ptlrpc_at_set_req_timeout(req);
736
737         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
738                 oa->o_lcookie = *oti->oti_logcookies;
739         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
740         LASSERT(body);
741         lustre_set_wire_obdo(&body->oa, oa);
742
743         osc_pack_capa(req, body, (struct obd_capa *)capa);
744         ptlrpc_request_set_replen(req);
745
746         /* don't throttle destroy RPCs for the MDT */
747         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
748                 req->rq_interpret_reply = osc_destroy_interpret;
749                 if (!osc_can_send_destroy(cli)) {
750                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
751                                                           NULL);
752
753                         /*
754                          * Wait until the number of on-going destroy RPCs drops
755                          * under max_rpc_in_flight
756                          */
757                         l_wait_event_exclusive(cli->cl_destroy_waitq,
758                                                osc_can_send_destroy(cli), &lwi);
759                 }
760         }
761
762         /* Do not wait for response */
763         ptlrpcd_add_req(req, PSCOPE_OTHER);
764         RETURN(0);
765 }
766
767 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
768                                 long writing_bytes)
769 {
770         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
771
772         LASSERT(!(oa->o_valid & bits));
773
774         oa->o_valid |= bits;
775         client_obd_list_lock(&cli->cl_loi_list_lock);
776         oa->o_dirty = cli->cl_dirty;
777         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
778                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
779                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
780                 oa->o_undirty = 0;
781         } else if (cfs_atomic_read(&obd_dirty_pages) -
782                    cfs_atomic_read(&obd_dirty_transit_pages) >
783                    obd_max_dirty_pages + 1){
784                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
785                  * not covered by a lock thus they may safely race and trip
786                  * this CERROR() unless we add in a small fudge factor (+1). */
787                 CERROR("dirty %d - %d > system dirty_max %d\n",
788                        cfs_atomic_read(&obd_dirty_pages),
789                        cfs_atomic_read(&obd_dirty_transit_pages),
790                        obd_max_dirty_pages);
791                 oa->o_undirty = 0;
792         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
793                 CERROR("dirty %lu - dirty_max %lu too big???\n",
794                        cli->cl_dirty, cli->cl_dirty_max);
795                 oa->o_undirty = 0;
796         } else {
797                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
798                                 (cli->cl_max_rpcs_in_flight + 1);
799                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
800         }
801         oa->o_grant = cli->cl_avail_grant;
802         oa->o_dropped = cli->cl_lost_grant;
803         cli->cl_lost_grant = 0;
804         client_obd_list_unlock(&cli->cl_loi_list_lock);
805         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
806                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
807
808 }
809
810 static void osc_update_next_shrink(struct client_obd *cli)
811 {
812         cli->cl_next_shrink_grant =
813                 cfs_time_shift(cli->cl_grant_shrink_interval);
814         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
815                cli->cl_next_shrink_grant);
816 }
817
818 /* caller must hold loi_list_lock */
819 static void osc_consume_write_grant(struct client_obd *cli,
820                                     struct brw_page *pga)
821 {
822         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
823         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
824         cfs_atomic_inc(&obd_dirty_pages);
825         cli->cl_dirty += CFS_PAGE_SIZE;
826         cli->cl_avail_grant -= CFS_PAGE_SIZE;
827         pga->flag |= OBD_BRW_FROM_GRANT;
828         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
829                CFS_PAGE_SIZE, pga, pga->pg);
830         LASSERT(cli->cl_avail_grant >= 0);
831         osc_update_next_shrink(cli);
832 }
833
834 /* the companion to osc_consume_write_grant, called when a brw has completed.
835  * must be called with the loi lock held. */
836 static void osc_release_write_grant(struct client_obd *cli,
837                                     struct brw_page *pga, int sent)
838 {
839         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
840         ENTRY;
841
842         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
843         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
844                 EXIT;
845                 return;
846         }
847
848         pga->flag &= ~OBD_BRW_FROM_GRANT;
849         cfs_atomic_dec(&obd_dirty_pages);
850         cli->cl_dirty -= CFS_PAGE_SIZE;
851         if (pga->flag & OBD_BRW_NOCACHE) {
852                 pga->flag &= ~OBD_BRW_NOCACHE;
853                 cfs_atomic_dec(&obd_dirty_transit_pages);
854                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
855         }
856         if (!sent) {
857                 cli->cl_lost_grant += CFS_PAGE_SIZE;
858                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
859                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
860         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
861                 /* For short writes we shouldn't count parts of pages that
862                  * span a whole block on the OST side, or our accounting goes
863                  * wrong.  Should match the code in filter_grant_check. */
864                 int offset = pga->off & ~CFS_PAGE_MASK;
865                 int count = pga->count + (offset & (blocksize - 1));
866                 int end = (offset + pga->count) & (blocksize - 1);
867                 if (end)
868                         count += blocksize - end;
869
870                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
871                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
872                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
873                        cli->cl_avail_grant, cli->cl_dirty);
874         }
875
876         EXIT;
877 }
878
879 static unsigned long rpcs_in_flight(struct client_obd *cli)
880 {
881         return cli->cl_r_in_flight + cli->cl_w_in_flight;
882 }
883
884 /* caller must hold loi_list_lock */
885 void osc_wake_cache_waiters(struct client_obd *cli)
886 {
887         cfs_list_t *l, *tmp;
888         struct osc_cache_waiter *ocw;
889
890         ENTRY;
891         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
892                 /* if we can't dirty more, we must wait until some is written */
893                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
894                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
895                     obd_max_dirty_pages)) {
896                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
897                                "osc max %ld, sys max %d\n", cli->cl_dirty,
898                                cli->cl_dirty_max, obd_max_dirty_pages);
899                         return;
900                 }
901
902                 /* if still dirty cache but no grant wait for pending RPCs that
903                  * may yet return us some grant before doing sync writes */
904                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
905                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
906                                cli->cl_w_in_flight);
907                         return;
908                 }
909
910                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
911                 cfs_list_del_init(&ocw->ocw_entry);
912                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
913                         /* no more RPCs in flight to return grant, do sync IO */
914                         ocw->ocw_rc = -EDQUOT;
915                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
916                 } else {
917                         osc_consume_write_grant(cli,
918                                                 &ocw->ocw_oap->oap_brw_page);
919                 }
920
921                 cfs_waitq_signal(&ocw->ocw_waitq);
922         }
923
924         EXIT;
925 }
926
927 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
928 {
929         client_obd_list_lock(&cli->cl_loi_list_lock);
930         cli->cl_avail_grant += grant;
931         client_obd_list_unlock(&cli->cl_loi_list_lock);
932 }
933
934 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
935 {
936         if (body->oa.o_valid & OBD_MD_FLGRANT) {
937                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
938                 __osc_update_grant(cli, body->oa.o_grant);
939         }
940 }
941
942 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
943                               void *key, obd_count vallen, void *val,
944                               struct ptlrpc_request_set *set);
945
946 static int osc_shrink_grant_interpret(const struct lu_env *env,
947                                       struct ptlrpc_request *req,
948                                       void *aa, int rc)
949 {
950         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
951         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
952         struct ost_body *body;
953
954         if (rc != 0) {
955                 __osc_update_grant(cli, oa->o_grant);
956                 GOTO(out, rc);
957         }
958
959         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
960         LASSERT(body);
961         osc_update_grant(cli, body);
962 out:
963         OBDO_FREE(oa);
964         return rc;
965 }
966
967 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
968 {
969         client_obd_list_lock(&cli->cl_loi_list_lock);
970         oa->o_grant = cli->cl_avail_grant / 4;
971         cli->cl_avail_grant -= oa->o_grant;
972         client_obd_list_unlock(&cli->cl_loi_list_lock);
973         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
974                 oa->o_valid |= OBD_MD_FLFLAGS;
975                 oa->o_flags = 0;
976         }
977         oa->o_flags |= OBD_FL_SHRINK_GRANT;
978         osc_update_next_shrink(cli);
979 }
980
981 /* Shrink the current grant, either from some large amount to enough for a
982  * full set of in-flight RPCs, or if we have already shrunk to that limit
983  * then to enough for a single RPC.  This avoids keeping more grant than
984  * needed, and avoids shrinking the grant piecemeal. */
985 static int osc_shrink_grant(struct client_obd *cli)
986 {
987         long target = (cli->cl_max_rpcs_in_flight + 1) *
988                       cli->cl_max_pages_per_rpc;
989
990         client_obd_list_lock(&cli->cl_loi_list_lock);
991         if (cli->cl_avail_grant <= target)
992                 target = cli->cl_max_pages_per_rpc;
993         client_obd_list_unlock(&cli->cl_loi_list_lock);
994
995         return osc_shrink_grant_to_target(cli, target);
996 }
997
998 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
999 {
1000         int    rc = 0;
1001         struct ost_body     *body;
1002         ENTRY;
1003
1004         client_obd_list_lock(&cli->cl_loi_list_lock);
1005         /* Don't shrink if we are already above or below the desired limit
1006          * We don't want to shrink below a single RPC, as that will negatively
1007          * impact block allocation and long-term performance. */
1008         if (target < cli->cl_max_pages_per_rpc)
1009                 target = cli->cl_max_pages_per_rpc;
1010
1011         if (target >= cli->cl_avail_grant) {
1012                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1013                 RETURN(0);
1014         }
1015         client_obd_list_unlock(&cli->cl_loi_list_lock);
1016
1017         OBD_ALLOC_PTR(body);
1018         if (!body)
1019                 RETURN(-ENOMEM);
1020
1021         osc_announce_cached(cli, &body->oa, 0);
1022
1023         client_obd_list_lock(&cli->cl_loi_list_lock);
1024         body->oa.o_grant = cli->cl_avail_grant - target;
1025         cli->cl_avail_grant = target;
1026         client_obd_list_unlock(&cli->cl_loi_list_lock);
1027         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1028                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1029                 body->oa.o_flags = 0;
1030         }
1031         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1032         osc_update_next_shrink(cli);
1033
1034         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1035                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1036                                 sizeof(*body), body, NULL);
1037         if (rc != 0)
1038                 __osc_update_grant(cli, body->oa.o_grant);
1039         OBD_FREE_PTR(body);
1040         RETURN(rc);
1041 }
1042
1043 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1044 static int osc_should_shrink_grant(struct client_obd *client)
1045 {
1046         cfs_time_t time = cfs_time_current();
1047         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1048
1049         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1050              OBD_CONNECT_GRANT_SHRINK) == 0)
1051                 return 0;
1052
1053         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1054                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1055                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1056                         return 1;
1057                 else
1058                         osc_update_next_shrink(client);
1059         }
1060         return 0;
1061 }
1062
1063 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1064 {
1065         struct client_obd *client;
1066
1067         cfs_list_for_each_entry(client, &item->ti_obd_list,
1068                                 cl_grant_shrink_list) {
1069                 if (osc_should_shrink_grant(client))
1070                         osc_shrink_grant(client);
1071         }
1072         return 0;
1073 }
1074
1075 static int osc_add_shrink_grant(struct client_obd *client)
1076 {
1077         int rc;
1078
1079         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1080                                        TIMEOUT_GRANT,
1081                                        osc_grant_shrink_grant_cb, NULL,
1082                                        &client->cl_grant_shrink_list);
1083         if (rc) {
1084                 CERROR("add grant client %s error %d\n",
1085                         client->cl_import->imp_obd->obd_name, rc);
1086                 return rc;
1087         }
1088         CDEBUG(D_CACHE, "add grant client %s \n",
1089                client->cl_import->imp_obd->obd_name);
1090         osc_update_next_shrink(client);
1091         return 0;
1092 }
1093
1094 static int osc_del_shrink_grant(struct client_obd *client)
1095 {
1096         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1097                                          TIMEOUT_GRANT);
1098 }
1099
1100 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1101 {
1102         /*
1103          * ocd_grant is the total grant amount we're expect to hold: if we've
1104          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1105          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1106          *
1107          * race is tolerable here: if we're evicted, but imp_state already
1108          * left EVICTED state, then cl_dirty must be 0 already.
1109          */
1110         client_obd_list_lock(&cli->cl_loi_list_lock);
1111         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1112                 cli->cl_avail_grant = ocd->ocd_grant;
1113         else
1114                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1115
1116         if (cli->cl_avail_grant < 0) {
1117                 CWARN("%s: available grant < 0, the OSS is probably not running"
1118                       " with patch from bug20278 (%ld) \n",
1119                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1120                 /* workaround for 1.6 servers which do not have 
1121                  * the patch from bug20278 */
1122                 cli->cl_avail_grant = ocd->ocd_grant;
1123         }
1124
1125         client_obd_list_unlock(&cli->cl_loi_list_lock);
1126
1127         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1128                cli->cl_import->imp_obd->obd_name,
1129                cli->cl_avail_grant, cli->cl_lost_grant);
1130
1131         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1132             cfs_list_empty(&cli->cl_grant_shrink_list))
1133                 osc_add_shrink_grant(cli);
1134 }
1135
1136 /* We assume that the reason this OSC got a short read is because it read
1137  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1138  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1139  * this stripe never got written at or beyond this stripe offset yet. */
1140 static void handle_short_read(int nob_read, obd_count page_count,
1141                               struct brw_page **pga)
1142 {
1143         char *ptr;
1144         int i = 0;
1145
1146         /* skip bytes read OK */
1147         while (nob_read > 0) {
1148                 LASSERT (page_count > 0);
1149
1150                 if (pga[i]->count > nob_read) {
1151                         /* EOF inside this page */
1152                         ptr = cfs_kmap(pga[i]->pg) +
1153                                 (pga[i]->off & ~CFS_PAGE_MASK);
1154                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1155                         cfs_kunmap(pga[i]->pg);
1156                         page_count--;
1157                         i++;
1158                         break;
1159                 }
1160
1161                 nob_read -= pga[i]->count;
1162                 page_count--;
1163                 i++;
1164         }
1165
1166         /* zero remaining pages */
1167         while (page_count-- > 0) {
1168                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1169                 memset(ptr, 0, pga[i]->count);
1170                 cfs_kunmap(pga[i]->pg);
1171                 i++;
1172         }
1173 }
1174
1175 static int check_write_rcs(struct ptlrpc_request *req,
1176                            int requested_nob, int niocount,
1177                            obd_count page_count, struct brw_page **pga)
1178 {
1179         int     i;
1180         __u32   *remote_rcs;
1181
1182         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1183                                                   sizeof(*remote_rcs) *
1184                                                   niocount);
1185         if (remote_rcs == NULL) {
1186                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1187                 return(-EPROTO);
1188         }
1189
1190         /* return error if any niobuf was in error */
1191         for (i = 0; i < niocount; i++) {
1192                 if (remote_rcs[i] < 0)
1193                         return(remote_rcs[i]);
1194
1195                 if (remote_rcs[i] != 0) {
1196                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1197                                 i, remote_rcs[i], req);
1198                         return(-EPROTO);
1199                 }
1200         }
1201
1202         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1203                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1204                        req->rq_bulk->bd_nob_transferred, requested_nob);
1205                 return(-EPROTO);
1206         }
1207
1208         return (0);
1209 }
1210
1211 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1212 {
1213         if (p1->flag != p2->flag) {
1214                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1215                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1216
1217                 /* warn if we try to combine flags that we don't know to be
1218                  * safe to combine */
1219                 if ((p1->flag & mask) != (p2->flag & mask))
1220                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1221                                "same brw?\n", p1->flag, p2->flag);
1222                 return 0;
1223         }
1224
1225         return (p1->off + p1->count == p2->off);
1226 }
1227
1228 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1229                                    struct brw_page **pga, int opc,
1230                                    cksum_type_t cksum_type)
1231 {
1232         __u32 cksum;
1233         int i = 0;
1234
1235         LASSERT (pg_count > 0);
1236         cksum = init_checksum(cksum_type);
1237         while (nob > 0 && pg_count > 0) {
1238                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1239                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1240                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1241
1242                 /* corrupt the data before we compute the checksum, to
1243                  * simulate an OST->client data error */
1244                 if (i == 0 && opc == OST_READ &&
1245                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1246                         memcpy(ptr + off, "bad1", min(4, nob));
1247                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1248                 cfs_kunmap(pga[i]->pg);
1249                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1250                                off, cksum);
1251
1252                 nob -= pga[i]->count;
1253                 pg_count--;
1254                 i++;
1255         }
1256         /* For sending we only compute the wrong checksum instead
1257          * of corrupting the data so it is still correct on a redo */
1258         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1259                 cksum++;
1260
1261         return cksum;
1262 }
1263
1264 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1265                                 struct lov_stripe_md *lsm, obd_count page_count,
1266                                 struct brw_page **pga,
1267                                 struct ptlrpc_request **reqp,
1268                                 struct obd_capa *ocapa, int reserve,
1269                                 int resend)
1270 {
1271         struct ptlrpc_request   *req;
1272         struct ptlrpc_bulk_desc *desc;
1273         struct ost_body         *body;
1274         struct obd_ioobj        *ioobj;
1275         struct niobuf_remote    *niobuf;
1276         int niocount, i, requested_nob, opc, rc;
1277         struct osc_brw_async_args *aa;
1278         struct req_capsule      *pill;
1279         struct brw_page *pg_prev;
1280
1281         ENTRY;
1282         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1283                 RETURN(-ENOMEM); /* Recoverable */
1284         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1285                 RETURN(-EINVAL); /* Fatal */
1286
1287         if ((cmd & OBD_BRW_WRITE) != 0) {
1288                 opc = OST_WRITE;
1289                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1290                                                 cli->cl_import->imp_rq_pool,
1291                                                 &RQF_OST_BRW_WRITE);
1292         } else {
1293                 opc = OST_READ;
1294                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1295         }
1296         if (req == NULL)
1297                 RETURN(-ENOMEM);
1298
1299         for (niocount = i = 1; i < page_count; i++) {
1300                 if (!can_merge_pages(pga[i - 1], pga[i]))
1301                         niocount++;
1302         }
1303
1304         pill = &req->rq_pill;
1305         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1306                              sizeof(*ioobj));
1307         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1308                              niocount * sizeof(*niobuf));
1309         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1310
1311         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1312         if (rc) {
1313                 ptlrpc_request_free(req);
1314                 RETURN(rc);
1315         }
1316         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1317         ptlrpc_at_set_req_timeout(req);
1318
1319         if (opc == OST_WRITE)
1320                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1321                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1322         else
1323                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1324                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1325
1326         if (desc == NULL)
1327                 GOTO(out, rc = -ENOMEM);
1328         /* NB request now owns desc and will free it when it gets freed */
1329
1330         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1331         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1332         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1333         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1334
1335         lustre_set_wire_obdo(&body->oa, oa);
1336
1337         obdo_to_ioobj(oa, ioobj);
1338         ioobj->ioo_bufcnt = niocount;
1339         osc_pack_capa(req, body, ocapa);
1340         LASSERT (page_count > 0);
1341         pg_prev = pga[0];
1342         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1343                 struct brw_page *pg = pga[i];
1344
1345                 LASSERT(pg->count > 0);
1346                 LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
1347                          "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
1348                          pg->off, pg->count);
1349 #ifdef __linux__
1350                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1351                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1352                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1353                          i, page_count,
1354                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1355                          pg_prev->pg, page_private(pg_prev->pg),
1356                          pg_prev->pg->index, pg_prev->off);
1357 #else
1358                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1359                          "i %d p_c %u\n", i, page_count);
1360 #endif
1361                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1362                         (pg->flag & OBD_BRW_SRVLOCK));
1363
1364                 ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
1365                                       pg->count);
1366                 requested_nob += pg->count;
1367
1368                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1369                         niobuf--;
1370                         niobuf->len += pg->count;
1371                 } else {
1372                         niobuf->offset = pg->off;
1373                         niobuf->len    = pg->count;
1374                         niobuf->flags  = pg->flag;
1375                 }
1376                 pg_prev = pg;
1377         }
1378
1379         LASSERTF((void *)(niobuf - niocount) ==
1380                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1381                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1382                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1383
1384         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1385         if (resend) {
1386                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1387                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1388                         body->oa.o_flags = 0;
1389                 }
1390                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1391         }
1392
1393         if (osc_should_shrink_grant(cli))
1394                 osc_shrink_grant_local(cli, &body->oa);
1395
1396         /* size[REQ_REC_OFF] still sizeof (*body) */
1397         if (opc == OST_WRITE) {
1398                 if (unlikely(cli->cl_checksum) &&
1399                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1400                         /* store cl_cksum_type in a local variable since
1401                          * it can be changed via lprocfs */
1402                         cksum_type_t cksum_type = cli->cl_cksum_type;
1403
1404                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1405                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1406                                 body->oa.o_flags = 0;
1407                         }
1408                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1409                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1410                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1411                                                              page_count, pga,
1412                                                              OST_WRITE,
1413                                                              cksum_type);
1414                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1415                                body->oa.o_cksum);
1416                         /* save this in 'oa', too, for later checking */
1417                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1418                         oa->o_flags |= cksum_type_pack(cksum_type);
1419                 } else {
1420                         /* clear out the checksum flag, in case this is a
1421                          * resend but cl_checksum is no longer set. b=11238 */
1422                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1423                 }
1424                 oa->o_cksum = body->oa.o_cksum;
1425                 /* 1 RC per niobuf */
1426                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1427                                      sizeof(__u32) * niocount);
1428         } else {
1429                 if (unlikely(cli->cl_checksum) &&
1430                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1431                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1432                                 body->oa.o_flags = 0;
1433                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1434                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1435                 }
1436         }
1437         ptlrpc_request_set_replen(req);
1438
1439         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1440         aa = ptlrpc_req_async_args(req);
1441         aa->aa_oa = oa;
1442         aa->aa_requested_nob = requested_nob;
1443         aa->aa_nio_count = niocount;
1444         aa->aa_page_count = page_count;
1445         aa->aa_resends = 0;
1446         aa->aa_ppga = pga;
1447         aa->aa_cli = cli;
1448         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1449         if (ocapa && reserve)
1450                 aa->aa_ocapa = capa_get(ocapa);
1451
1452         *reqp = req;
1453         RETURN(0);
1454
1455  out:
1456         ptlrpc_req_finished(req);
1457         RETURN(rc);
1458 }
1459
1460 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1461                                 __u32 client_cksum, __u32 server_cksum, int nob,
1462                                 obd_count page_count, struct brw_page **pga,
1463                                 cksum_type_t client_cksum_type)
1464 {
1465         __u32 new_cksum;
1466         char *msg;
1467         cksum_type_t cksum_type;
1468
1469         if (server_cksum == client_cksum) {
1470                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1471                 return 0;
1472         }
1473
1474         /* If this is mmaped file - it can be changed at any time */
1475         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1476                 return 1;
1477
1478         if (oa->o_valid & OBD_MD_FLFLAGS)
1479                 cksum_type = cksum_type_unpack(oa->o_flags);
1480         else
1481                 cksum_type = OBD_CKSUM_CRC32;
1482
1483         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1484                                       cksum_type);
1485
1486         if (cksum_type != client_cksum_type)
1487                 msg = "the server did not use the checksum type specified in "
1488                       "the original request - likely a protocol problem";
1489         else if (new_cksum == server_cksum)
1490                 msg = "changed on the client after we checksummed it - "
1491                       "likely false positive due to mmap IO (bug 11742)";
1492         else if (new_cksum == client_cksum)
1493                 msg = "changed in transit before arrival at OST";
1494         else
1495                 msg = "changed in transit AND doesn't match the original - "
1496                       "likely false positive due to mmap IO (bug 11742)";
1497
1498         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1499                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1500                            msg, libcfs_nid2str(peer->nid),
1501                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1502                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1503                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1504                            oa->o_id,
1505                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1506                            pga[0]->off,
1507                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1508         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1509                "client csum now %x\n", client_cksum, client_cksum_type,
1510                server_cksum, cksum_type, new_cksum);
1511         return 1;
1512 }
1513
1514 /* Note rc enters this function as number of bytes transferred */
1515 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1516 {
1517         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1518         const lnet_process_id_t *peer =
1519                         &req->rq_import->imp_connection->c_peer;
1520         struct client_obd *cli = aa->aa_cli;
1521         struct ost_body *body;
1522         __u32 client_cksum = 0;
1523         ENTRY;
1524
1525         if (rc < 0 && rc != -EDQUOT) {
1526                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1527                 RETURN(rc);
1528         }
1529
1530         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1531         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1532         if (body == NULL) {
1533                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1534                 RETURN(-EPROTO);
1535         }
1536
1537 #ifdef HAVE_QUOTA_SUPPORT
1538         /* set/clear over quota flag for a uid/gid */
1539         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1540             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1541                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1542
1543                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1544                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1545                        body->oa.o_flags);
1546                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1547                              body->oa.o_flags);
1548         }
1549 #endif
1550
1551         osc_update_grant(cli, body);
1552
1553         if (rc < 0)
1554                 RETURN(rc);
1555
1556         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1557                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1558
1559         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1560                 if (rc > 0) {
1561                         CERROR("Unexpected +ve rc %d\n", rc);
1562                         RETURN(-EPROTO);
1563                 }
1564                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1565
1566                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1567                         RETURN(-EAGAIN);
1568
1569                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1570                     check_write_checksum(&body->oa, peer, client_cksum,
1571                                          body->oa.o_cksum, aa->aa_requested_nob,
1572                                          aa->aa_page_count, aa->aa_ppga,
1573                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1574                         RETURN(-EAGAIN);
1575
1576                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1577                                      aa->aa_page_count, aa->aa_ppga);
1578                 GOTO(out, rc);
1579         }
1580
1581         /* The rest of this function executes only for OST_READs */
1582
1583         /* if unwrap_bulk failed, return -EAGAIN to retry */
1584         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1585         if (rc < 0)
1586                 GOTO(out, rc = -EAGAIN);
1587
1588         if (rc > aa->aa_requested_nob) {
1589                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1590                        aa->aa_requested_nob);
1591                 RETURN(-EPROTO);
1592         }
1593
1594         if (rc != req->rq_bulk->bd_nob_transferred) {
1595                 CERROR ("Unexpected rc %d (%d transferred)\n",
1596                         rc, req->rq_bulk->bd_nob_transferred);
1597                 return (-EPROTO);
1598         }
1599
1600         if (rc < aa->aa_requested_nob)
1601                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1602
1603         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1604                 static int cksum_counter;
1605                 __u32      server_cksum = body->oa.o_cksum;
1606                 char      *via;
1607                 char      *router;
1608                 cksum_type_t cksum_type;
1609
1610                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1611                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1612                 else
1613                         cksum_type = OBD_CKSUM_CRC32;
1614                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1615                                                  aa->aa_ppga, OST_READ,
1616                                                  cksum_type);
1617
1618                 if (peer->nid == req->rq_bulk->bd_sender) {
1619                         via = router = "";
1620                 } else {
1621                         via = " via ";
1622                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1623                 }
1624
1625                 if (server_cksum == ~0 && rc > 0) {
1626                         CERROR("Protocol error: server %s set the 'checksum' "
1627                                "bit, but didn't send a checksum.  Not fatal, "
1628                                "but please notify on http://bugzilla.lustre.org/\n",
1629                                libcfs_nid2str(peer->nid));
1630                 } else if (server_cksum != client_cksum) {
1631                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1632                                            "%s%s%s inode "DFID" object "
1633                                            LPU64"/"LPU64" extent "
1634                                            "["LPU64"-"LPU64"]\n",
1635                                            req->rq_import->imp_obd->obd_name,
1636                                            libcfs_nid2str(peer->nid),
1637                                            via, router,
1638                                            body->oa.o_valid & OBD_MD_FLFID ?
1639                                                 body->oa.o_parent_seq : (__u64)0,
1640                                            body->oa.o_valid & OBD_MD_FLFID ?
1641                                                 body->oa.o_parent_oid : 0,
1642                                            body->oa.o_valid & OBD_MD_FLFID ?
1643                                                 body->oa.o_parent_ver : 0,
1644                                            body->oa.o_id,
1645                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1646                                                 body->oa.o_seq : (__u64)0,
1647                                            aa->aa_ppga[0]->off,
1648                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1649                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1650                                                                         1);
1651                         CERROR("client %x, server %x, cksum_type %x\n",
1652                                client_cksum, server_cksum, cksum_type);
1653                         cksum_counter = 0;
1654                         aa->aa_oa->o_cksum = client_cksum;
1655                         rc = -EAGAIN;
1656                 } else {
1657                         cksum_counter++;
1658                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1659                         rc = 0;
1660                 }
1661         } else if (unlikely(client_cksum)) {
1662                 static int cksum_missed;
1663
1664                 cksum_missed++;
1665                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1666                         CERROR("Checksum %u requested from %s but not sent\n",
1667                                cksum_missed, libcfs_nid2str(peer->nid));
1668         } else {
1669                 rc = 0;
1670         }
1671 out:
1672         if (rc >= 0)
1673                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1674
1675         RETURN(rc);
1676 }
1677
1678 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1679                             struct lov_stripe_md *lsm,
1680                             obd_count page_count, struct brw_page **pga,
1681                             struct obd_capa *ocapa)
1682 {
1683         struct ptlrpc_request *req;
1684         int                    rc;
1685         cfs_waitq_t            waitq;
1686         int                    resends = 0;
1687         struct l_wait_info     lwi;
1688
1689         ENTRY;
1690
1691         cfs_waitq_init(&waitq);
1692
1693 restart_bulk:
1694         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1695                                   page_count, pga, &req, ocapa, 0, resends);
1696         if (rc != 0)
1697                 return (rc);
1698
1699         rc = ptlrpc_queue_wait(req);
1700
1701         if (rc == -ETIMEDOUT && req->rq_resend) {
1702                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1703                 ptlrpc_req_finished(req);
1704                 goto restart_bulk;
1705         }
1706
1707         rc = osc_brw_fini_request(req, rc);
1708
1709         ptlrpc_req_finished(req);
1710         if (osc_recoverable_error(rc)) {
1711                 resends++;
1712                 if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
1713                         CERROR("too many resend retries, returning error\n");
1714                         RETURN(-EIO);
1715                 }
1716
1717                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1718                 l_wait_event(waitq, 0, &lwi);
1719
1720                 goto restart_bulk;
1721         }
1722
1723         RETURN (rc);
1724 }
1725
1726 int osc_brw_redo_request(struct ptlrpc_request *request,
1727                          struct osc_brw_async_args *aa)
1728 {
1729         struct ptlrpc_request *new_req;
1730         struct ptlrpc_request_set *set = request->rq_set;
1731         struct osc_brw_async_args *new_aa;
1732         struct osc_async_page *oap;
1733         int rc = 0;
1734         ENTRY;
1735
1736         if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
1737                 CERROR("too many resent retries, returning error\n");
1738                 RETURN(-EIO);
1739         }
1740
1741         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1742
1743         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1744                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1745                                   aa->aa_cli, aa->aa_oa,
1746                                   NULL /* lsm unused by osc currently */,
1747                                   aa->aa_page_count, aa->aa_ppga,
1748                                   &new_req, aa->aa_ocapa, 0, 1);
1749         if (rc)
1750                 RETURN(rc);
1751
1752         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1753
1754         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1755                 if (oap->oap_request != NULL) {
1756                         LASSERTF(request == oap->oap_request,
1757                                  "request %p != oap_request %p\n",
1758                                  request, oap->oap_request);
1759                         if (oap->oap_interrupted) {
1760                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1761                                 ptlrpc_req_finished(new_req);
1762                                 RETURN(-EINTR);
1763                         }
1764                 }
1765         }
1766         /* New request takes over pga and oaps from old request.
1767          * Note that copying a list_head doesn't work, need to move it... */
1768         aa->aa_resends++;
1769         new_req->rq_interpret_reply = request->rq_interpret_reply;
1770         new_req->rq_async_args = request->rq_async_args;
1771         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1772
1773         new_aa = ptlrpc_req_async_args(new_req);
1774
1775         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1776         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1777         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1778
1779         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1780                 if (oap->oap_request) {
1781                         ptlrpc_req_finished(oap->oap_request);
1782                         oap->oap_request = ptlrpc_request_addref(new_req);
1783                 }
1784         }
1785
1786         new_aa->aa_ocapa = aa->aa_ocapa;
1787         aa->aa_ocapa = NULL;
1788
1789         /* use ptlrpc_set_add_req is safe because interpret functions work
1790          * in check_set context. only one way exist with access to request
1791          * from different thread got -EINTR - this way protected with
1792          * cl_loi_list_lock */
1793         ptlrpc_set_add_req(set, new_req);
1794
1795         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1796
1797         DEBUG_REQ(D_INFO, new_req, "new request");
1798         RETURN(0);
1799 }
1800
1801 /*
1802  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1803  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1804  * fine for our small page arrays and doesn't require allocation.  its an
1805  * insertion sort that swaps elements that are strides apart, shrinking the
1806  * stride down until its '1' and the array is sorted.
1807  */
1808 static void sort_brw_pages(struct brw_page **array, int num)
1809 {
1810         int stride, i, j;
1811         struct brw_page *tmp;
1812
1813         if (num == 1)
1814                 return;
1815         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1816                 ;
1817
1818         do {
1819                 stride /= 3;
1820                 for (i = stride ; i < num ; i++) {
1821                         tmp = array[i];
1822                         j = i;
1823                         while (j >= stride && array[j - stride]->off > tmp->off) {
1824                                 array[j] = array[j - stride];
1825                                 j -= stride;
1826                         }
1827                         array[j] = tmp;
1828                 }
1829         } while (stride > 1);
1830 }
1831
1832 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1833 {
1834         int count = 1;
1835         int offset;
1836         int i = 0;
1837
1838         LASSERT (pages > 0);
1839         offset = pg[i]->off & ~CFS_PAGE_MASK;
1840
1841         for (;;) {
1842                 pages--;
1843                 if (pages == 0)         /* that's all */
1844                         return count;
1845
1846                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1847                         return count;   /* doesn't end on page boundary */
1848
1849                 i++;
1850                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1851                 if (offset != 0)        /* doesn't start on page boundary */
1852                         return count;
1853
1854                 count++;
1855         }
1856 }
1857
1858 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1859 {
1860         struct brw_page **ppga;
1861         int i;
1862
1863         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1864         if (ppga == NULL)
1865                 return NULL;
1866
1867         for (i = 0; i < count; i++)
1868                 ppga[i] = pga + i;
1869         return ppga;
1870 }
1871
1872 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1873 {
1874         LASSERT(ppga != NULL);
1875         OBD_FREE(ppga, sizeof(*ppga) * count);
1876 }
1877
1878 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1879                    obd_count page_count, struct brw_page *pga,
1880                    struct obd_trans_info *oti)
1881 {
1882         struct obdo *saved_oa = NULL;
1883         struct brw_page **ppga, **orig;
1884         struct obd_import *imp = class_exp2cliimp(exp);
1885         struct client_obd *cli;
1886         int rc, page_count_orig;
1887         ENTRY;
1888
1889         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1890         cli = &imp->imp_obd->u.cli;
1891
1892         if (cmd & OBD_BRW_CHECK) {
1893                 /* The caller just wants to know if there's a chance that this
1894                  * I/O can succeed */
1895
1896                 if (imp->imp_invalid)
1897                         RETURN(-EIO);
1898                 RETURN(0);
1899         }
1900
1901         /* test_brw with a failed create can trip this, maybe others. */
1902         LASSERT(cli->cl_max_pages_per_rpc);
1903
1904         rc = 0;
1905
1906         orig = ppga = osc_build_ppga(pga, page_count);
1907         if (ppga == NULL)
1908                 RETURN(-ENOMEM);
1909         page_count_orig = page_count;
1910
1911         sort_brw_pages(ppga, page_count);
1912         while (page_count) {
1913                 obd_count pages_per_brw;
1914
1915                 if (page_count > cli->cl_max_pages_per_rpc)
1916                         pages_per_brw = cli->cl_max_pages_per_rpc;
1917                 else
1918                         pages_per_brw = page_count;
1919
1920                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1921
1922                 if (saved_oa != NULL) {
1923                         /* restore previously saved oa */
1924                         *oinfo->oi_oa = *saved_oa;
1925                 } else if (page_count > pages_per_brw) {
1926                         /* save a copy of oa (brw will clobber it) */
1927                         OBDO_ALLOC(saved_oa);
1928                         if (saved_oa == NULL)
1929                                 GOTO(out, rc = -ENOMEM);
1930                         *saved_oa = *oinfo->oi_oa;
1931                 }
1932
1933                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1934                                       pages_per_brw, ppga, oinfo->oi_capa);
1935
1936                 if (rc != 0)
1937                         break;
1938
1939                 page_count -= pages_per_brw;
1940                 ppga += pages_per_brw;
1941         }
1942
1943 out:
1944         osc_release_ppga(orig, page_count_orig);
1945
1946         if (saved_oa != NULL)
1947                 OBDO_FREE(saved_oa);
1948
1949         RETURN(rc);
1950 }
1951
1952 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1953  * the dirty accounting.  Writeback completes or truncate happens before
1954  * writing starts.  Must be called with the loi lock held. */
1955 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1956                            int sent)
1957 {
1958         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1959 }
1960
1961
1962 /* This maintains the lists of pending pages to read/write for a given object
1963  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1964  * to quickly find objects that are ready to send an RPC. */
1965 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1966                          int cmd)
1967 {
1968         int optimal;
1969         ENTRY;
1970
1971         if (lop->lop_num_pending == 0)
1972                 RETURN(0);
1973
1974         /* if we have an invalid import we want to drain the queued pages
1975          * by forcing them through rpcs that immediately fail and complete
1976          * the pages.  recovery relies on this to empty the queued pages
1977          * before canceling the locks and evicting down the llite pages */
1978         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1979                 RETURN(1);
1980
1981         /* stream rpcs in queue order as long as as there is an urgent page
1982          * queued.  this is our cheap solution for good batching in the case
1983          * where writepage marks some random page in the middle of the file
1984          * as urgent because of, say, memory pressure */
1985         if (!cfs_list_empty(&lop->lop_urgent)) {
1986                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1987                 RETURN(1);
1988         }
1989         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
1990         optimal = cli->cl_max_pages_per_rpc;
1991         if (cmd & OBD_BRW_WRITE) {
1992                 /* trigger a write rpc stream as long as there are dirtiers
1993                  * waiting for space.  as they're waiting, they're not going to
1994                  * create more pages to coalesce with what's waiting.. */
1995                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
1996                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1997                         RETURN(1);
1998                 }
1999                 /* +16 to avoid triggering rpcs that would want to include pages
2000                  * that are being queued but which can't be made ready until
2001                  * the queuer finishes with the page. this is a wart for
2002                  * llite::commit_write() */
2003                 optimal += 16;
2004         }
2005         if (lop->lop_num_pending >= optimal)
2006                 RETURN(1);
2007
2008         RETURN(0);
2009 }
2010
2011 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2012 {
2013         struct osc_async_page *oap;
2014         ENTRY;
2015
2016         if (cfs_list_empty(&lop->lop_urgent))
2017                 RETURN(0);
2018
2019         oap = cfs_list_entry(lop->lop_urgent.next,
2020                          struct osc_async_page, oap_urgent_item);
2021
2022         if (oap->oap_async_flags & ASYNC_HP) {
2023                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2024                 RETURN(1);
2025         }
2026
2027         RETURN(0);
2028 }
2029
2030 static void on_list(cfs_list_t *item, cfs_list_t *list,
2031                     int should_be_on)
2032 {
2033         if (cfs_list_empty(item) && should_be_on)
2034                 cfs_list_add_tail(item, list);
2035         else if (!cfs_list_empty(item) && !should_be_on)
2036                 cfs_list_del_init(item);
2037 }
2038
2039 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2040  * can find pages to build into rpcs quickly */
2041 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2042 {
2043         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2044             lop_makes_hprpc(&loi->loi_read_lop)) {
2045                 /* HP rpc */
2046                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2047                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2048         } else {
2049                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2050                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2051                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2052                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2053         }
2054
2055         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2056                 loi->loi_write_lop.lop_num_pending);
2057
2058         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2059                 loi->loi_read_lop.lop_num_pending);
2060 }
2061
2062 static void lop_update_pending(struct client_obd *cli,
2063                                struct loi_oap_pages *lop, int cmd, int delta)
2064 {
2065         lop->lop_num_pending += delta;
2066         if (cmd & OBD_BRW_WRITE)
2067                 cli->cl_pending_w_pages += delta;
2068         else
2069                 cli->cl_pending_r_pages += delta;
2070 }
2071
2072 /**
2073  * this is called when a sync waiter receives an interruption.  Its job is to
2074  * get the caller woken as soon as possible.  If its page hasn't been put in an
2075  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2076  * desiring interruption which will forcefully complete the rpc once the rpc
2077  * has timed out.
2078  */
2079 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2080 {
2081         struct loi_oap_pages *lop;
2082         struct lov_oinfo *loi;
2083         int rc = -EBUSY;
2084         ENTRY;
2085
2086         LASSERT(!oap->oap_interrupted);
2087         oap->oap_interrupted = 1;
2088
2089         /* ok, it's been put in an rpc. only one oap gets a request reference */
2090         if (oap->oap_request != NULL) {
2091                 ptlrpc_mark_interrupted(oap->oap_request);
2092                 ptlrpcd_wake(oap->oap_request);
2093                 ptlrpc_req_finished(oap->oap_request);
2094                 oap->oap_request = NULL;
2095         }
2096
2097         /*
2098          * page completion may be called only if ->cpo_prep() method was
2099          * executed by osc_io_submit(), that also adds page the to pending list
2100          */
2101         if (!cfs_list_empty(&oap->oap_pending_item)) {
2102                 cfs_list_del_init(&oap->oap_pending_item);
2103                 cfs_list_del_init(&oap->oap_urgent_item);
2104
2105                 loi = oap->oap_loi;
2106                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2107                         &loi->loi_write_lop : &loi->loi_read_lop;
2108                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2109                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2110                 rc = oap->oap_caller_ops->ap_completion(env,
2111                                           oap->oap_caller_data,
2112                                           oap->oap_cmd, NULL, -EINTR);
2113         }
2114
2115         RETURN(rc);
2116 }
2117
2118 /* this is trying to propogate async writeback errors back up to the
2119  * application.  As an async write fails we record the error code for later if
2120  * the app does an fsync.  As long as errors persist we force future rpcs to be
2121  * sync so that the app can get a sync error and break the cycle of queueing
2122  * pages for which writeback will fail. */
2123 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2124                            int rc)
2125 {
2126         if (rc) {
2127                 if (!ar->ar_rc)
2128                         ar->ar_rc = rc;
2129
2130                 ar->ar_force_sync = 1;
2131                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2132                 return;
2133
2134         }
2135
2136         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2137                 ar->ar_force_sync = 0;
2138 }
2139
2140 void osc_oap_to_pending(struct osc_async_page *oap)
2141 {
2142         struct loi_oap_pages *lop;
2143
2144         if (oap->oap_cmd & OBD_BRW_WRITE)
2145                 lop = &oap->oap_loi->loi_write_lop;
2146         else
2147                 lop = &oap->oap_loi->loi_read_lop;
2148
2149         if (oap->oap_async_flags & ASYNC_HP)
2150                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2151         else if (oap->oap_async_flags & ASYNC_URGENT)
2152                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2153         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2154         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2155 }
2156
2157 /* this must be called holding the loi list lock to give coverage to exit_cache,
2158  * async_flag maintenance, and oap_request */
2159 static void osc_ap_completion(const struct lu_env *env,
2160                               struct client_obd *cli, struct obdo *oa,
2161                               struct osc_async_page *oap, int sent, int rc)
2162 {
2163         __u64 xid = 0;
2164
2165         ENTRY;
2166         if (oap->oap_request != NULL) {
2167                 xid = ptlrpc_req_xid(oap->oap_request);
2168                 ptlrpc_req_finished(oap->oap_request);
2169                 oap->oap_request = NULL;
2170         }
2171
2172         cfs_spin_lock(&oap->oap_lock);
2173         oap->oap_async_flags = 0;
2174         cfs_spin_unlock(&oap->oap_lock);
2175         oap->oap_interrupted = 0;
2176
2177         if (oap->oap_cmd & OBD_BRW_WRITE) {
2178                 osc_process_ar(&cli->cl_ar, xid, rc);
2179                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2180         }
2181
2182         if (rc == 0 && oa != NULL) {
2183                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2184                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2185                 if (oa->o_valid & OBD_MD_FLMTIME)
2186                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2187                 if (oa->o_valid & OBD_MD_FLATIME)
2188                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2189                 if (oa->o_valid & OBD_MD_FLCTIME)
2190                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2191         }
2192
2193         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2194                                                 oap->oap_cmd, oa, rc);
2195
2196         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2197          * I/O on the page could start, but OSC calls it under lock
2198          * and thus we can add oap back to pending safely */
2199         if (rc)
2200                 /* upper layer wants to leave the page on pending queue */
2201                 osc_oap_to_pending(oap);
2202         else
2203                 osc_exit_cache(cli, oap, sent);
2204         EXIT;
2205 }
2206
2207 static int brw_interpret(const struct lu_env *env,
2208                          struct ptlrpc_request *req, void *data, int rc)
2209 {
2210         struct osc_brw_async_args *aa = data;
2211         struct client_obd *cli;
2212         int async;
2213         ENTRY;
2214
2215         rc = osc_brw_fini_request(req, rc);
2216         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2217         if (osc_recoverable_error(rc)) {
2218                 /* Only retry once for mmaped files since the mmaped page
2219                  * might be modified at anytime. We have to retry at least
2220                  * once in case there WAS really a corruption of the page
2221                  * on the network, that was not caused by mmap() modifying
2222                  * the page. Bug11742 */
2223                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2224                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2225                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2226                         rc = 0;
2227                 } else {
2228                         rc = osc_brw_redo_request(req, aa);
2229                         if (rc == 0)
2230                                 RETURN(0);
2231                 }
2232         }
2233
2234         if (aa->aa_ocapa) {
2235                 capa_put(aa->aa_ocapa);
2236                 aa->aa_ocapa = NULL;
2237         }
2238
2239         cli = aa->aa_cli;
2240
2241         client_obd_list_lock(&cli->cl_loi_list_lock);
2242
2243         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2244          * is called so we know whether to go to sync BRWs or wait for more
2245          * RPCs to complete */
2246         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2247                 cli->cl_w_in_flight--;
2248         else
2249                 cli->cl_r_in_flight--;
2250
2251         async = cfs_list_empty(&aa->aa_oaps);
2252         if (!async) { /* from osc_send_oap_rpc() */
2253                 struct osc_async_page *oap, *tmp;
2254                 /* the caller may re-use the oap after the completion call so
2255                  * we need to clean it up a little */
2256                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2257                                              oap_rpc_item) {
2258                         cfs_list_del_init(&oap->oap_rpc_item);
2259                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2260                 }
2261                 OBDO_FREE(aa->aa_oa);
2262         } else { /* from async_internal() */
2263                 obd_count i;
2264                 for (i = 0; i < aa->aa_page_count; i++)
2265                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2266         }
2267         osc_wake_cache_waiters(cli);
2268         osc_check_rpcs(env, cli);
2269         client_obd_list_unlock(&cli->cl_loi_list_lock);
2270         if (!async)
2271                 cl_req_completion(env, aa->aa_clerq, rc);
2272         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2273
2274         RETURN(rc);
2275 }
2276
2277 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2278                                             struct client_obd *cli,
2279                                             cfs_list_t *rpc_list,
2280                                             int page_count, int cmd)
2281 {
2282         struct ptlrpc_request *req;
2283         struct brw_page **pga = NULL;
2284         struct osc_brw_async_args *aa;
2285         struct obdo *oa = NULL;
2286         const struct obd_async_page_ops *ops = NULL;
2287         void *caller_data = NULL;
2288         struct osc_async_page *oap;
2289         struct osc_async_page *tmp;
2290         struct ost_body *body;
2291         struct cl_req *clerq = NULL;
2292         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2293         struct ldlm_lock *lock = NULL;
2294         struct cl_req_attr crattr;
2295         int i, rc, mpflag = 0;
2296
2297         ENTRY;
2298         LASSERT(!cfs_list_empty(rpc_list));
2299
2300         if (cmd & OBD_BRW_MEMALLOC)
2301                 mpflag = cfs_memory_pressure_get_and_set();
2302
2303         memset(&crattr, 0, sizeof crattr);
2304         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2305         if (pga == NULL)
2306                 GOTO(out, req = ERR_PTR(-ENOMEM));
2307
2308         OBDO_ALLOC(oa);
2309         if (oa == NULL)
2310                 GOTO(out, req = ERR_PTR(-ENOMEM));
2311
2312         i = 0;
2313         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2314                 struct cl_page *page = osc_oap2cl_page(oap);
2315                 if (ops == NULL) {
2316                         ops = oap->oap_caller_ops;
2317                         caller_data = oap->oap_caller_data;
2318
2319                         clerq = cl_req_alloc(env, page, crt,
2320                                              1 /* only 1-object rpcs for
2321                                                 * now */);
2322                         if (IS_ERR(clerq))
2323                                 GOTO(out, req = (void *)clerq);
2324                         lock = oap->oap_ldlm_lock;
2325                 }
2326                 pga[i] = &oap->oap_brw_page;
2327                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2328                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2329                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2330                 i++;
2331                 cl_req_page_add(env, clerq, page);
2332         }
2333
2334         /* always get the data for the obdo for the rpc */
2335         LASSERT(ops != NULL);
2336         crattr.cra_oa = oa;
2337         crattr.cra_capa = NULL;
2338         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2339         if (lock) {
2340                 oa->o_handle = lock->l_remote_handle;
2341                 oa->o_valid |= OBD_MD_FLHANDLE;
2342         }
2343
2344         rc = cl_req_prep(env, clerq);
2345         if (rc != 0) {
2346                 CERROR("cl_req_prep failed: %d\n", rc);
2347                 GOTO(out, req = ERR_PTR(rc));
2348         }
2349
2350         sort_brw_pages(pga, page_count);
2351         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2352                                   pga, &req, crattr.cra_capa, 1, 0);
2353         if (rc != 0) {
2354                 CERROR("prep_req failed: %d\n", rc);
2355                 GOTO(out, req = ERR_PTR(rc));
2356         }
2357
2358         if (cmd & OBD_BRW_MEMALLOC)
2359                 req->rq_memalloc = 1;
2360
2361         /* Need to update the timestamps after the request is built in case
2362          * we race with setattr (locally or in queue at OST).  If OST gets
2363          * later setattr before earlier BRW (as determined by the request xid),
2364          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2365          * way to do this in a single call.  bug 10150 */
2366         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2367         cl_req_attr_set(env, clerq, &crattr,
2368                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2369
2370         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2371         aa = ptlrpc_req_async_args(req);
2372         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2373         cfs_list_splice(rpc_list, &aa->aa_oaps);
2374         CFS_INIT_LIST_HEAD(rpc_list);
2375         aa->aa_clerq = clerq;
2376 out:
2377         if (cmd & OBD_BRW_MEMALLOC)
2378                 cfs_memory_pressure_restore(mpflag);
2379
2380         capa_put(crattr.cra_capa);
2381         if (IS_ERR(req)) {
2382                 if (oa)
2383                         OBDO_FREE(oa);
2384                 if (pga)
2385                         OBD_FREE(pga, sizeof(*pga) * page_count);
2386                 /* this should happen rarely and is pretty bad, it makes the
2387                  * pending list not follow the dirty order */
2388                 client_obd_list_lock(&cli->cl_loi_list_lock);
2389                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2390                         cfs_list_del_init(&oap->oap_rpc_item);
2391
2392                         /* queued sync pages can be torn down while the pages
2393                          * were between the pending list and the rpc */
2394                         if (oap->oap_interrupted) {
2395                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2396                                 osc_ap_completion(env, cli, NULL, oap, 0,
2397                                                   oap->oap_count);
2398                                 continue;
2399                         }
2400                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2401                 }
2402                 if (clerq && !IS_ERR(clerq))
2403                         cl_req_completion(env, clerq, PTR_ERR(req));
2404         }
2405         RETURN(req);
2406 }
2407
2408 /**
2409  * prepare pages for ASYNC io and put pages in send queue.
2410  *
2411  * \param cmd OBD_BRW_* macroses
2412  * \param lop pending pages
2413  *
2414  * \return zero if no page added to send queue.
2415  * \return 1 if pages successfully added to send queue.
2416  * \return negative on errors.
2417  */
2418 static int
2419 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2420                  struct lov_oinfo *loi,
2421                  int cmd, struct loi_oap_pages *lop)
2422 {
2423         struct ptlrpc_request *req;
2424         obd_count page_count = 0;
2425         struct osc_async_page *oap = NULL, *tmp;
2426         struct osc_brw_async_args *aa;
2427         const struct obd_async_page_ops *ops;
2428         CFS_LIST_HEAD(rpc_list);
2429         CFS_LIST_HEAD(tmp_list);
2430         unsigned int ending_offset;
2431         unsigned  starting_offset = 0;
2432         int srvlock = 0, mem_tight = 0;
2433         struct cl_object *clob = NULL;
2434         ENTRY;
2435
2436         /* ASYNC_HP pages first. At present, when the lock the pages is
2437          * to be canceled, the pages covered by the lock will be sent out
2438          * with ASYNC_HP. We have to send out them as soon as possible. */
2439         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2440                 if (oap->oap_async_flags & ASYNC_HP) 
2441                         cfs_list_move(&oap->oap_pending_item, &tmp_list);
2442                 else
2443                         cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
2444                 if (++page_count >= cli->cl_max_pages_per_rpc)
2445                         break;
2446         }
2447
2448         cfs_list_splice(&tmp_list, &lop->lop_pending);
2449         page_count = 0;
2450
2451         /* first we find the pages we're allowed to work with */
2452         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2453                                      oap_pending_item) {
2454                 ops = oap->oap_caller_ops;
2455
2456                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2457                          "magic 0x%x\n", oap, oap->oap_magic);
2458
2459                 if (clob == NULL) {
2460                         /* pin object in memory, so that completion call-backs
2461                          * can be safely called under client_obd_list lock. */
2462                         clob = osc_oap2cl_page(oap)->cp_obj;
2463                         cl_object_get(clob);
2464                 }
2465
2466                 if (page_count != 0 &&
2467                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2468                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2469                                " oap %p, page %p, srvlock %u\n",
2470                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2471                         break;
2472                 }
2473
2474                 /* If there is a gap at the start of this page, it can't merge
2475                  * with any previous page, so we'll hand the network a
2476                  * "fragmented" page array that it can't transfer in 1 RDMA */
2477                 if (page_count != 0 && oap->oap_page_off != 0)
2478                         break;
2479
2480                 /* in llite being 'ready' equates to the page being locked
2481                  * until completion unlocks it.  commit_write submits a page
2482                  * as not ready because its unlock will happen unconditionally
2483                  * as the call returns.  if we race with commit_write giving
2484                  * us that page we don't want to create a hole in the page
2485                  * stream, so we stop and leave the rpc to be fired by
2486                  * another dirtier or kupdated interval (the not ready page
2487                  * will still be on the dirty list).  we could call in
2488                  * at the end of ll_file_write to process the queue again. */
2489                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2490                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2491                                                     cmd);
2492                         if (rc < 0)
2493                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2494                                                 "instead of ready\n", oap,
2495                                                 oap->oap_page, rc);
2496                         switch (rc) {
2497                         case -EAGAIN:
2498                                 /* llite is telling us that the page is still
2499                                  * in commit_write and that we should try
2500                                  * and put it in an rpc again later.  we
2501                                  * break out of the loop so we don't create
2502                                  * a hole in the sequence of pages in the rpc
2503                                  * stream.*/
2504                                 oap = NULL;
2505                                 break;
2506                         case -EINTR:
2507                                 /* the io isn't needed.. tell the checks
2508                                  * below to complete the rpc with EINTR */
2509                                 cfs_spin_lock(&oap->oap_lock);
2510                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2511                                 cfs_spin_unlock(&oap->oap_lock);
2512                                 oap->oap_count = -EINTR;
2513                                 break;
2514                         case 0:
2515                                 cfs_spin_lock(&oap->oap_lock);
2516                                 oap->oap_async_flags |= ASYNC_READY;
2517                                 cfs_spin_unlock(&oap->oap_lock);
2518                                 break;
2519                         default:
2520                                 LASSERTF(0, "oap %p page %p returned %d "
2521                                             "from make_ready\n", oap,
2522                                             oap->oap_page, rc);
2523                                 break;
2524                         }
2525                 }
2526                 if (oap == NULL)
2527                         break;
2528                 /*
2529                  * Page submitted for IO has to be locked. Either by
2530                  * ->ap_make_ready() or by higher layers.
2531                  */
2532 #if defined(__KERNEL__) && defined(__linux__)
2533                 {
2534                         struct cl_page *page;
2535
2536                         page = osc_oap2cl_page(oap);
2537
2538                         if (page->cp_type == CPT_CACHEABLE &&
2539                             !(PageLocked(oap->oap_page) &&
2540                               (CheckWriteback(oap->oap_page, cmd)))) {
2541                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2542                                        oap->oap_page,
2543                                        (long)oap->oap_page->flags,
2544                                        oap->oap_async_flags);
2545                                 LBUG();
2546                         }
2547                 }
2548 #endif
2549
2550                 /* take the page out of our book-keeping */
2551                 cfs_list_del_init(&oap->oap_pending_item);
2552                 lop_update_pending(cli, lop, cmd, -1);
2553                 cfs_list_del_init(&oap->oap_urgent_item);
2554
2555                 if (page_count == 0)
2556                         starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
2557                                           (PTLRPC_MAX_BRW_SIZE - 1);
2558
2559                 /* ask the caller for the size of the io as the rpc leaves. */
2560                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2561                         oap->oap_count =
2562                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2563                                                       cmd);
2564                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2565                 }
2566                 if (oap->oap_count <= 0) {
2567                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2568                                oap->oap_count);
2569                         osc_ap_completion(env, cli, NULL,
2570                                           oap, 0, oap->oap_count);
2571                         continue;
2572                 }
2573
2574                 /* now put the page back in our accounting */
2575                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2576                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2577                         mem_tight = 1;
2578                 if (page_count == 0)
2579                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2580                 if (++page_count >= cli->cl_max_pages_per_rpc)
2581                         break;
2582
2583                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2584                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2585                  * have the same alignment as the initial writes that allocated
2586                  * extents on the server. */
2587                 ending_offset = (oap->oap_obj_off + oap->oap_page_off +
2588                                  oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
2589                 if (ending_offset == 0)
2590                         break;
2591
2592                 /* If there is a gap at the end of this page, it can't merge
2593                  * with any subsequent pages, so we'll hand the network a
2594                  * "fragmented" page array that it can't transfer in 1 RDMA */
2595                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2596                         break;
2597         }
2598
2599         osc_wake_cache_waiters(cli);
2600
2601         loi_list_maint(cli, loi);
2602
2603         client_obd_list_unlock(&cli->cl_loi_list_lock);
2604
2605         if (clob != NULL)
2606                 cl_object_put(env, clob);
2607
2608         if (page_count == 0) {
2609                 client_obd_list_lock(&cli->cl_loi_list_lock);
2610                 RETURN(0);
2611         }
2612
2613         req = osc_build_req(env, cli, &rpc_list, page_count,
2614                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2615         if (IS_ERR(req)) {
2616                 LASSERT(cfs_list_empty(&rpc_list));
2617                 loi_list_maint(cli, loi);
2618                 RETURN(PTR_ERR(req));
2619         }
2620
2621         aa = ptlrpc_req_async_args(req);
2622
2623         if (cmd == OBD_BRW_READ) {
2624                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2625                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2626                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2627                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2628         } else {
2629                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2630                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2631                                  cli->cl_w_in_flight);
2632                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2633                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2634         }
2635         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2636
2637         client_obd_list_lock(&cli->cl_loi_list_lock);
2638
2639         if (cmd == OBD_BRW_READ)
2640                 cli->cl_r_in_flight++;
2641         else
2642                 cli->cl_w_in_flight++;
2643
2644         /* queued sync pages can be torn down while the pages
2645          * were between the pending list and the rpc */
2646         tmp = NULL;
2647         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2648                 /* only one oap gets a request reference */
2649                 if (tmp == NULL)
2650                         tmp = oap;
2651                 if (oap->oap_interrupted && !req->rq_intr) {
2652                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2653                                oap, req);
2654                         ptlrpc_mark_interrupted(req);
2655                 }
2656         }
2657         if (tmp != NULL)
2658                 tmp->oap_request = ptlrpc_request_addref(req);
2659
2660         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2661                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2662
2663         req->rq_interpret_reply = brw_interpret;
2664         ptlrpcd_add_req(req, PSCOPE_BRW);
2665         RETURN(1);
2666 }
2667
2668 #define LOI_DEBUG(LOI, STR, args...)                                     \
2669         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2670                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2671                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2672                (LOI)->loi_write_lop.lop_num_pending,                     \
2673                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2674                (LOI)->loi_read_lop.lop_num_pending,                      \
2675                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2676                args)                                                     \
2677
2678 /* This is called by osc_check_rpcs() to find which objects have pages that
2679  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2680 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2681 {
2682         ENTRY;
2683
2684         /* First return objects that have blocked locks so that they
2685          * will be flushed quickly and other clients can get the lock,
2686          * then objects which have pages ready to be stuffed into RPCs */
2687         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2688                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2689                                       struct lov_oinfo, loi_hp_ready_item));
2690         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2691                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2692                                       struct lov_oinfo, loi_ready_item));
2693
2694         /* then if we have cache waiters, return all objects with queued
2695          * writes.  This is especially important when many small files
2696          * have filled up the cache and not been fired into rpcs because
2697          * they don't pass the nr_pending/object threshhold */
2698         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2699             !cfs_list_empty(&cli->cl_loi_write_list))
2700                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2701                                       struct lov_oinfo, loi_write_item));
2702
2703         /* then return all queued objects when we have an invalid import
2704          * so that they get flushed */
2705         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2706                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2707                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2708                                               struct lov_oinfo,
2709                                               loi_write_item));
2710                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2711                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2712                                               struct lov_oinfo, loi_read_item));
2713         }
2714         RETURN(NULL);
2715 }
2716
2717 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2718 {
2719         struct osc_async_page *oap;
2720         int hprpc = 0;
2721
2722         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2723                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2724                                      struct osc_async_page, oap_urgent_item);
2725                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2726         }
2727
2728         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2729                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2730                                      struct osc_async_page, oap_urgent_item);
2731                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2732         }
2733
2734         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2735 }
2736
2737 /* called with the loi list lock held */
2738 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2739 {
2740         struct lov_oinfo *loi;
2741         int rc = 0, race_counter = 0;
2742         ENTRY;
2743
2744         while ((loi = osc_next_loi(cli)) != NULL) {
2745                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2746
2747                 if (osc_max_rpc_in_flight(cli, loi))
2748                         break;
2749
2750                 /* attempt some read/write balancing by alternating between
2751                  * reads and writes in an object.  The makes_rpc checks here
2752                  * would be redundant if we were getting read/write work items
2753                  * instead of objects.  we don't want send_oap_rpc to drain a
2754                  * partial read pending queue when we're given this object to
2755                  * do io on writes while there are cache waiters */
2756                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2757                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2758                                               &loi->loi_write_lop);
2759                         if (rc < 0) {
2760                                 CERROR("Write request failed with %d\n", rc);
2761
2762                                 /* osc_send_oap_rpc failed, mostly because of
2763                                  * memory pressure.
2764                                  *
2765                                  * It can't break here, because if:
2766                                  *  - a page was submitted by osc_io_submit, so
2767                                  *    page locked;
2768                                  *  - no request in flight
2769                                  *  - no subsequent request
2770                                  * The system will be in live-lock state,
2771                                  * because there is no chance to call
2772                                  * osc_io_unplug() and osc_check_rpcs() any
2773                                  * more. pdflush can't help in this case,
2774                                  * because it might be blocked at grabbing
2775                                  * the page lock as we mentioned.
2776                                  *
2777                                  * Anyway, continue to drain pages. */
2778                                 /* break; */
2779                         }
2780
2781                         if (rc > 0)
2782                                 race_counter = 0;
2783                         else
2784                                 race_counter++;
2785                 }
2786                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2787                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2788                                               &loi->loi_read_lop);
2789                         if (rc < 0)
2790                                 CERROR("Read request failed with %d\n", rc);
2791
2792                         if (rc > 0)
2793                                 race_counter = 0;
2794                         else
2795                                 race_counter++;
2796                 }
2797
2798                 /* attempt some inter-object balancing by issuing rpcs
2799                  * for each object in turn */
2800                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2801                         cfs_list_del_init(&loi->loi_hp_ready_item);
2802                 if (!cfs_list_empty(&loi->loi_ready_item))
2803                         cfs_list_del_init(&loi->loi_ready_item);
2804                 if (!cfs_list_empty(&loi->loi_write_item))
2805                         cfs_list_del_init(&loi->loi_write_item);
2806                 if (!cfs_list_empty(&loi->loi_read_item))
2807                         cfs_list_del_init(&loi->loi_read_item);
2808
2809                 loi_list_maint(cli, loi);
2810
2811                 /* send_oap_rpc fails with 0 when make_ready tells it to
2812                  * back off.  llite's make_ready does this when it tries
2813                  * to lock a page queued for write that is already locked.
2814                  * we want to try sending rpcs from many objects, but we
2815                  * don't want to spin failing with 0.  */
2816                 if (race_counter == 10)
2817                         break;
2818         }
2819         EXIT;
2820 }
2821
2822 /* we're trying to queue a page in the osc so we're subject to the
2823  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2824  * If the osc's queued pages are already at that limit, then we want to sleep
2825  * until there is space in the osc's queue for us.  We also may be waiting for
2826  * write credits from the OST if there are RPCs in flight that may return some
2827  * before we fall back to sync writes.
2828  *
2829  * We need this know our allocation was granted in the presence of signals */
2830 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2831 {
2832         int rc;
2833         ENTRY;
2834         client_obd_list_lock(&cli->cl_loi_list_lock);
2835         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2836         client_obd_list_unlock(&cli->cl_loi_list_lock);
2837         RETURN(rc);
2838 };
2839
2840 /**
2841  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2842  * is available.
2843  */
2844 int osc_enter_cache_try(const struct lu_env *env,
2845                         struct client_obd *cli, struct lov_oinfo *loi,
2846                         struct osc_async_page *oap, int transient)
2847 {
2848         int has_grant;
2849
2850         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2851         if (has_grant) {
2852                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2853                 if (transient) {
2854                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2855                         cfs_atomic_inc(&obd_dirty_transit_pages);
2856                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2857                 }
2858         }
2859         return has_grant;
2860 }
2861
2862 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2863  * grant or cache space. */
2864 static int osc_enter_cache(const struct lu_env *env,
2865                            struct client_obd *cli, struct lov_oinfo *loi,
2866                            struct osc_async_page *oap)
2867 {
2868         struct osc_cache_waiter ocw;
2869         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2870
2871         ENTRY;
2872
2873         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2874                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2875                cli->cl_dirty_max, obd_max_dirty_pages,
2876                cli->cl_lost_grant, cli->cl_avail_grant);
2877
2878         /* force the caller to try sync io.  this can jump the list
2879          * of queued writes and create a discontiguous rpc stream */
2880         if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
2881             loi->loi_ar.ar_force_sync)
2882                 RETURN(-EDQUOT);
2883
2884         /* Hopefully normal case - cache space and write credits available */
2885         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2886             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2887             osc_enter_cache_try(env, cli, loi, oap, 0))
2888                 RETURN(0);
2889
2890         /* It is safe to block as a cache waiter as long as there is grant
2891          * space available or the hope of additional grant being returned
2892          * when an in flight write completes.  Using the write back cache
2893          * if possible is preferable to sending the data synchronously
2894          * because write pages can then be merged in to large requests.
2895          * The addition of this cache waiter will causing pending write
2896          * pages to be sent immediately. */
2897         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2898                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2899                 cfs_waitq_init(&ocw.ocw_waitq);
2900                 ocw.ocw_oap = oap;
2901                 ocw.ocw_rc = 0;
2902
2903                 loi_list_maint(cli, loi);
2904                 osc_check_rpcs(env, cli);
2905                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2906
2907                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2908                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2909
2910                 client_obd_list_lock(&cli->cl_loi_list_lock);
2911                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2912                         cfs_list_del(&ocw.ocw_entry);
2913                         RETURN(-EINTR);
2914                 }
2915                 RETURN(ocw.ocw_rc);
2916         }
2917
2918         RETURN(-EDQUOT);
2919 }
2920
2921
2922 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2923                         struct lov_oinfo *loi, cfs_page_t *page,
2924                         obd_off offset, const struct obd_async_page_ops *ops,
2925                         void *data, void **res, int nocache,
2926                         struct lustre_handle *lockh)
2927 {
2928         struct osc_async_page *oap;
2929
2930         ENTRY;
2931
2932         if (!page)
2933                 return cfs_size_round(sizeof(*oap));
2934
2935         oap = *res;
2936         oap->oap_magic = OAP_MAGIC;
2937         oap->oap_cli = &exp->exp_obd->u.cli;
2938         oap->oap_loi = loi;
2939
2940         oap->oap_caller_ops = ops;
2941         oap->oap_caller_data = data;
2942
2943         oap->oap_page = page;
2944         oap->oap_obj_off = offset;
2945         if (!client_is_remote(exp) &&
2946             cfs_capable(CFS_CAP_SYS_RESOURCE))
2947                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2948
2949         LASSERT(!(offset & ~CFS_PAGE_MASK));
2950
2951         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2952         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2953         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2954         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2955
2956         cfs_spin_lock_init(&oap->oap_lock);
2957         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2958         RETURN(0);
2959 }
2960
2961 struct osc_async_page *oap_from_cookie(void *cookie)
2962 {
2963         struct osc_async_page *oap = cookie;
2964         if (oap->oap_magic != OAP_MAGIC)
2965                 return ERR_PTR(-EINVAL);
2966         return oap;
2967 };
2968
2969 int osc_queue_async_io(const struct lu_env *env,
2970                        struct obd_export *exp, struct lov_stripe_md *lsm,
2971                        struct lov_oinfo *loi, void *cookie,
2972                        int cmd, obd_off off, int count,
2973                        obd_flag brw_flags, enum async_flags async_flags)
2974 {
2975         struct client_obd *cli = &exp->exp_obd->u.cli;
2976         struct osc_async_page *oap;
2977         int rc = 0;
2978         ENTRY;
2979
2980         oap = oap_from_cookie(cookie);
2981         if (IS_ERR(oap))
2982                 RETURN(PTR_ERR(oap));
2983
2984         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2985                 RETURN(-EIO);
2986
2987         if (!cfs_list_empty(&oap->oap_pending_item) ||
2988             !cfs_list_empty(&oap->oap_urgent_item) ||
2989             !cfs_list_empty(&oap->oap_rpc_item))
2990                 RETURN(-EBUSY);
2991
2992         /* check if the file's owner/group is over quota */
2993         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2994                 struct cl_object *obj;
2995                 struct cl_attr    attr; /* XXX put attr into thread info */
2996                 unsigned int qid[MAXQUOTAS];
2997
2998                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
2999
3000                 cl_object_attr_lock(obj);
3001                 rc = cl_object_attr_get(env, obj, &attr);
3002                 cl_object_attr_unlock(obj);
3003
3004                 qid[USRQUOTA] = attr.cat_uid;
3005                 qid[GRPQUOTA] = attr.cat_gid;
3006                 if (rc == 0 &&
3007                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3008                         rc = -EDQUOT;
3009                 if (rc)
3010                         RETURN(rc);
3011         }
3012
3013         if (loi == NULL)
3014                 loi = lsm->lsm_oinfo[0];
3015
3016         client_obd_list_lock(&cli->cl_loi_list_lock);
3017
3018         LASSERT(off + count <= CFS_PAGE_SIZE);
3019         oap->oap_cmd = cmd;
3020         oap->oap_page_off = off;
3021         oap->oap_count = count;
3022         oap->oap_brw_flags = brw_flags;
3023         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3024         if (cfs_memory_pressure_get())
3025                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3026         cfs_spin_lock(&oap->oap_lock);
3027         oap->oap_async_flags = async_flags;
3028         cfs_spin_unlock(&oap->oap_lock);
3029
3030         if (cmd & OBD_BRW_WRITE) {
3031                 rc = osc_enter_cache(env, cli, loi, oap);
3032                 if (rc) {
3033                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3034                         RETURN(rc);
3035                 }
3036         }
3037
3038         osc_oap_to_pending(oap);
3039         loi_list_maint(cli, loi);
3040
3041         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3042                   cmd);
3043
3044         osc_check_rpcs(env, cli);
3045         client_obd_list_unlock(&cli->cl_loi_list_lock);
3046
3047         RETURN(0);
3048 }
3049
3050 /* aka (~was & now & flag), but this is more clear :) */
3051 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3052
3053 int osc_set_async_flags_base(struct client_obd *cli,
3054                              struct lov_oinfo *loi, struct osc_async_page *oap,
3055                              obd_flag async_flags)
3056 {
3057         struct loi_oap_pages *lop;
3058         int flags = 0;
3059         ENTRY;
3060
3061         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3062
3063         if (oap->oap_cmd & OBD_BRW_WRITE) {
3064                 lop = &loi->loi_write_lop;
3065         } else {
3066                 lop = &loi->loi_read_lop;
3067         }
3068
3069         if ((oap->oap_async_flags & async_flags) == async_flags)
3070                 RETURN(0);
3071
3072         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3073                 flags |= ASYNC_READY;
3074
3075         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3076             cfs_list_empty(&oap->oap_rpc_item)) {
3077                 if (oap->oap_async_flags & ASYNC_HP)
3078                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3079                 else
3080                         cfs_list_add_tail(&oap->oap_urgent_item,
3081                                           &lop->lop_urgent);
3082                 flags |= ASYNC_URGENT;
3083                 loi_list_maint(cli, loi);
3084         }
3085         cfs_spin_lock(&oap->oap_lock);
3086         oap->oap_async_flags |= flags;
3087         cfs_spin_unlock(&oap->oap_lock);
3088
3089         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3090                         oap->oap_async_flags);
3091         RETURN(0);
3092 }
3093
3094 int osc_teardown_async_page(struct obd_export *exp,
3095                             struct lov_stripe_md *lsm,
3096                             struct lov_oinfo *loi, void *cookie)
3097 {
3098         struct client_obd *cli = &exp->exp_obd->u.cli;
3099         struct loi_oap_pages *lop;
3100         struct osc_async_page *oap;
3101         int rc = 0;
3102         ENTRY;
3103
3104         oap = oap_from_cookie(cookie);
3105         if (IS_ERR(oap))
3106                 RETURN(PTR_ERR(oap));
3107
3108         if (loi == NULL)
3109                 loi = lsm->lsm_oinfo[0];
3110
3111         if (oap->oap_cmd & OBD_BRW_WRITE) {
3112                 lop = &loi->loi_write_lop;
3113         } else {
3114                 lop = &loi->loi_read_lop;
3115         }
3116
3117         client_obd_list_lock(&cli->cl_loi_list_lock);
3118
3119         if (!cfs_list_empty(&oap->oap_rpc_item))
3120                 GOTO(out, rc = -EBUSY);
3121
3122         osc_exit_cache(cli, oap, 0);
3123         osc_wake_cache_waiters(cli);
3124
3125         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3126                 cfs_list_del_init(&oap->oap_urgent_item);
3127                 cfs_spin_lock(&oap->oap_lock);
3128                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3129                 cfs_spin_unlock(&oap->oap_lock);
3130         }
3131         if (!cfs_list_empty(&oap->oap_pending_item)) {
3132                 cfs_list_del_init(&oap->oap_pending_item);
3133                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3134         }
3135         loi_list_maint(cli, loi);
3136         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3137 out:
3138         client_obd_list_unlock(&cli->cl_loi_list_lock);
3139         RETURN(rc);
3140 }
3141
3142 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3143                                         struct ldlm_enqueue_info *einfo)
3144 {
3145         void *data = einfo->ei_cbdata;
3146         int set = 0;
3147
3148         LASSERT(lock != NULL);
3149         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3150         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3151         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3152         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3153
3154         lock_res_and_lock(lock);
3155         cfs_spin_lock(&osc_ast_guard);
3156
3157         if (lock->l_ast_data == NULL)
3158                 lock->l_ast_data = data;
3159         if (lock->l_ast_data == data)
3160                 set = 1;
3161
3162         cfs_spin_unlock(&osc_ast_guard);
3163         unlock_res_and_lock(lock);
3164
3165         return set;
3166 }
3167
3168 static int osc_set_data_with_check(struct lustre_handle *lockh,
3169                                    struct ldlm_enqueue_info *einfo)
3170 {
3171         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3172         int set = 0;
3173
3174         if (lock != NULL) {
3175                 set = osc_set_lock_data_with_check(lock, einfo);
3176                 LDLM_LOCK_PUT(lock);
3177         } else
3178                 CERROR("lockh %p, data %p - client evicted?\n",
3179                        lockh, einfo->ei_cbdata);
3180         return set;
3181 }
3182
3183 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3184                              ldlm_iterator_t replace, void *data)
3185 {
3186         struct ldlm_res_id res_id;
3187         struct obd_device *obd = class_exp2obd(exp);
3188
3189         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3190         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3191         return 0;
3192 }
3193
3194 /* find any ldlm lock of the inode in osc
3195  * return 0    not find
3196  *        1    find one
3197  *      < 0    error */
3198 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3199                            ldlm_iterator_t replace, void *data)
3200 {
3201         struct ldlm_res_id res_id;
3202         struct obd_device *obd = class_exp2obd(exp);
3203         int rc = 0;
3204
3205         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3206         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3207         if (rc == LDLM_ITER_STOP)
3208                 return(1);
3209         if (rc == LDLM_ITER_CONTINUE)
3210                 return(0);
3211         return(rc);
3212 }
3213
3214 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3215                             obd_enqueue_update_f upcall, void *cookie,
3216                             int *flags, int rc)
3217 {
3218         int intent = *flags & LDLM_FL_HAS_INTENT;
3219         ENTRY;
3220
3221         if (intent) {
3222                 /* The request was created before ldlm_cli_enqueue call. */
3223                 if (rc == ELDLM_LOCK_ABORTED) {
3224                         struct ldlm_reply *rep;
3225                         rep = req_capsule_server_get(&req->rq_pill,
3226                                                      &RMF_DLM_REP);
3227
3228                         LASSERT(rep != NULL);
3229                         if (rep->lock_policy_res1)
3230                                 rc = rep->lock_policy_res1;
3231                 }
3232         }
3233
3234         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3235                 *flags |= LDLM_FL_LVB_READY;
3236                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3237                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3238         }
3239
3240         /* Call the update callback. */
3241         rc = (*upcall)(cookie, rc);
3242         RETURN(rc);
3243 }
3244
3245 static int osc_enqueue_interpret(const struct lu_env *env,
3246                                  struct ptlrpc_request *req,
3247                                  struct osc_enqueue_args *aa, int rc)
3248 {
3249         struct ldlm_lock *lock;
3250         struct lustre_handle handle;
3251         __u32 mode;
3252
3253         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3254          * might be freed anytime after lock upcall has been called. */
3255         lustre_handle_copy(&handle, aa->oa_lockh);
3256         mode = aa->oa_ei->ei_mode;
3257
3258         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3259          * be valid. */
3260         lock = ldlm_handle2lock(&handle);
3261
3262         /* Take an additional reference so that a blocking AST that
3263          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3264          * to arrive after an upcall has been executed by
3265          * osc_enqueue_fini(). */
3266         ldlm_lock_addref(&handle, mode);
3267
3268         /* Let CP AST to grant the lock first. */
3269         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3270
3271         /* Complete obtaining the lock procedure. */
3272         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3273                                    mode, aa->oa_flags, aa->oa_lvb,
3274                                    sizeof(*aa->oa_lvb), &handle, rc);
3275         /* Complete osc stuff. */
3276         rc = osc_enqueue_fini(req, aa->oa_lvb,
3277                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3278
3279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3280
3281         /* Release the lock for async request. */
3282         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3283                 /*
3284                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3285                  * not already released by
3286                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3287                  */
3288                 ldlm_lock_decref(&handle, mode);
3289
3290         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3291                  aa->oa_lockh, req, aa);
3292         ldlm_lock_decref(&handle, mode);
3293         LDLM_LOCK_PUT(lock);
3294         return rc;
3295 }
3296
3297 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3298                         struct lov_oinfo *loi, int flags,
3299                         struct ost_lvb *lvb, __u32 mode, int rc)
3300 {
3301         if (rc == ELDLM_OK) {
3302                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3303                 __u64 tmp;
3304
3305                 LASSERT(lock != NULL);
3306                 loi->loi_lvb = *lvb;
3307                 tmp = loi->loi_lvb.lvb_size;
3308                 /* Extend KMS up to the end of this lock and no further
3309                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3310                 if (tmp > lock->l_policy_data.l_extent.end)
3311                         tmp = lock->l_policy_data.l_extent.end + 1;
3312                 if (tmp >= loi->loi_kms) {
3313                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3314                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3315                         loi_kms_set(loi, tmp);
3316                 } else {
3317                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3318                                    LPU64"; leaving kms="LPU64", end="LPU64,
3319                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3320                                    lock->l_policy_data.l_extent.end);
3321                 }
3322                 ldlm_lock_allow_match(lock);
3323                 LDLM_LOCK_PUT(lock);
3324         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3325                 loi->loi_lvb = *lvb;
3326                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3327                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3328                 rc = ELDLM_OK;
3329         }
3330 }
3331 EXPORT_SYMBOL(osc_update_enqueue);
3332
3333 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3334
3335 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3336  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3337  * other synchronous requests, however keeping some locks and trying to obtain
3338  * others may take a considerable amount of time in a case of ost failure; and
3339  * when other sync requests do not get released lock from a client, the client
3340  * is excluded from the cluster -- such scenarious make the life difficult, so
3341  * release locks just after they are obtained. */
3342 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3343                      int *flags, ldlm_policy_data_t *policy,
3344                      struct ost_lvb *lvb, int kms_valid,
3345                      obd_enqueue_update_f upcall, void *cookie,
3346                      struct ldlm_enqueue_info *einfo,
3347                      struct lustre_handle *lockh,
3348                      struct ptlrpc_request_set *rqset, int async)
3349 {
3350         struct obd_device *obd = exp->exp_obd;
3351         struct ptlrpc_request *req = NULL;
3352         int intent = *flags & LDLM_FL_HAS_INTENT;
3353         ldlm_mode_t mode;
3354         int rc;
3355         ENTRY;
3356
3357         /* Filesystem lock extents are extended to page boundaries so that
3358          * dealing with the page cache is a little smoother.  */
3359         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3360         policy->l_extent.end |= ~CFS_PAGE_MASK;
3361
3362         /*
3363          * kms is not valid when either object is completely fresh (so that no
3364          * locks are cached), or object was evicted. In the latter case cached
3365          * lock cannot be used, because it would prime inode state with
3366          * potentially stale LVB.
3367          */
3368         if (!kms_valid)
3369                 goto no_match;
3370
3371         /* Next, search for already existing extent locks that will cover us */
3372         /* If we're trying to read, we also search for an existing PW lock.  The
3373          * VFS and page cache already protect us locally, so lots of readers/
3374          * writers can share a single PW lock.
3375          *
3376          * There are problems with conversion deadlocks, so instead of
3377          * converting a read lock to a write lock, we'll just enqueue a new
3378          * one.
3379          *
3380          * At some point we should cancel the read lock instead of making them
3381          * send us a blocking callback, but there are problems with canceling
3382          * locks out from other users right now, too. */
3383         mode = einfo->ei_mode;
3384         if (einfo->ei_mode == LCK_PR)
3385                 mode |= LCK_PW;
3386         mode = ldlm_lock_match(obd->obd_namespace,
3387                                *flags | LDLM_FL_LVB_READY, res_id,
3388                                einfo->ei_type, policy, mode, lockh, 0);
3389         if (mode) {
3390                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3391
3392                 if (osc_set_lock_data_with_check(matched, einfo)) {
3393                         /* addref the lock only if not async requests and PW
3394                          * lock is matched whereas we asked for PR. */
3395                         if (!rqset && einfo->ei_mode != mode)
3396                                 ldlm_lock_addref(lockh, LCK_PR);
3397                         if (intent) {
3398                                 /* I would like to be able to ASSERT here that
3399                                  * rss <= kms, but I can't, for reasons which
3400                                  * are explained in lov_enqueue() */
3401                         }
3402
3403                         /* We already have a lock, and it's referenced */
3404                         (*upcall)(cookie, ELDLM_OK);
3405
3406                         /* For async requests, decref the lock. */
3407                         if (einfo->ei_mode != mode)
3408                                 ldlm_lock_decref(lockh, LCK_PW);
3409                         else if (rqset)
3410                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3411                         LDLM_LOCK_PUT(matched);
3412                         RETURN(ELDLM_OK);
3413                 } else
3414                         ldlm_lock_decref(lockh, mode);
3415                 LDLM_LOCK_PUT(matched);
3416         }
3417
3418  no_match:
3419         if (intent) {
3420                 CFS_LIST_HEAD(cancels);
3421                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3422                                            &RQF_LDLM_ENQUEUE_LVB);
3423                 if (req == NULL)
3424                         RETURN(-ENOMEM);
3425
3426                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3427                 if (rc) {
3428                         ptlrpc_request_free(req);
3429                         RETURN(rc);
3430                 }
3431
3432                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3433                                      sizeof *lvb);
3434                 ptlrpc_request_set_replen(req);
3435         }
3436
3437         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3438         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3439
3440         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3441                               sizeof(*lvb), lockh, async);
3442         if (rqset) {
3443                 if (!rc) {
3444                         struct osc_enqueue_args *aa;
3445                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3446                         aa = ptlrpc_req_async_args(req);
3447                         aa->oa_ei = einfo;
3448                         aa->oa_exp = exp;
3449                         aa->oa_flags  = flags;
3450                         aa->oa_upcall = upcall;
3451                         aa->oa_cookie = cookie;
3452                         aa->oa_lvb    = lvb;
3453                         aa->oa_lockh  = lockh;
3454
3455                         req->rq_interpret_reply =
3456                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3457                         if (rqset == PTLRPCD_SET)
3458                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3459                         else
3460                                 ptlrpc_set_add_req(rqset, req);
3461                 } else if (intent) {
3462                         ptlrpc_req_finished(req);
3463                 }
3464                 RETURN(rc);
3465         }
3466
3467         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3468         if (intent)
3469                 ptlrpc_req_finished(req);
3470
3471         RETURN(rc);
3472 }
3473
3474 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3475                        struct ldlm_enqueue_info *einfo,
3476                        struct ptlrpc_request_set *rqset)
3477 {
3478         struct ldlm_res_id res_id;
3479         int rc;
3480         ENTRY;
3481
3482         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3483                            oinfo->oi_md->lsm_object_seq, &res_id);
3484
3485         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3486                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3487                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3488                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3489                               rqset, rqset != NULL);
3490         RETURN(rc);
3491 }
3492
3493 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3494                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3495                    int *flags, void *data, struct lustre_handle *lockh,
3496                    int unref)
3497 {
3498         struct obd_device *obd = exp->exp_obd;
3499         int lflags = *flags;
3500         ldlm_mode_t rc;
3501         ENTRY;
3502
3503         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3504                 RETURN(-EIO);
3505
3506         /* Filesystem lock extents are extended to page boundaries so that
3507          * dealing with the page cache is a little smoother */
3508         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3509         policy->l_extent.end |= ~CFS_PAGE_MASK;
3510
3511         /* Next, search for already existing extent locks that will cover us */
3512         /* If we're trying to read, we also search for an existing PW lock.  The
3513          * VFS and page cache already protect us locally, so lots of readers/
3514          * writers can share a single PW lock. */
3515         rc = mode;
3516         if (mode == LCK_PR)
3517                 rc |= LCK_PW;
3518         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3519                              res_id, type, policy, rc, lockh, unref);
3520         if (rc) {
3521                 if (data != NULL) {
3522                         if (!osc_set_data_with_check(lockh, data)) {
3523                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3524                                         ldlm_lock_decref(lockh, rc);
3525                                 RETURN(0);
3526                         }
3527                 }
3528                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3529                         ldlm_lock_addref(lockh, LCK_PR);
3530                         ldlm_lock_decref(lockh, LCK_PW);
3531                 }
3532                 RETURN(rc);
3533         }
3534         RETURN(rc);
3535 }
3536
3537 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3538 {
3539         ENTRY;
3540
3541         if (unlikely(mode == LCK_GROUP))
3542                 ldlm_lock_decref_and_cancel(lockh, mode);
3543         else
3544                 ldlm_lock_decref(lockh, mode);
3545
3546         RETURN(0);
3547 }
3548
3549 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3550                       __u32 mode, struct lustre_handle *lockh)
3551 {
3552         ENTRY;
3553         RETURN(osc_cancel_base(lockh, mode));
3554 }
3555
3556 static int osc_cancel_unused(struct obd_export *exp,
3557                              struct lov_stripe_md *lsm,
3558                              ldlm_cancel_flags_t flags,
3559                              void *opaque)
3560 {
3561         struct obd_device *obd = class_exp2obd(exp);
3562         struct ldlm_res_id res_id, *resp = NULL;
3563
3564         if (lsm != NULL) {
3565                 resp = osc_build_res_name(lsm->lsm_object_id,
3566                                           lsm->lsm_object_seq, &res_id);
3567         }
3568
3569         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3570 }
3571
3572 static int osc_statfs_interpret(const struct lu_env *env,
3573                                 struct ptlrpc_request *req,
3574                                 struct osc_async_args *aa, int rc)
3575 {
3576         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3577         struct obd_statfs *msfs;
3578         __u64 used;
3579         ENTRY;
3580
3581         if (rc == -EBADR)
3582                 /* The request has in fact never been sent
3583                  * due to issues at a higher level (LOV).
3584                  * Exit immediately since the caller is
3585                  * aware of the problem and takes care
3586                  * of the clean up */
3587                  RETURN(rc);
3588
3589         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3590             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3591                 GOTO(out, rc = 0);
3592
3593         if (rc != 0)
3594                 GOTO(out, rc);
3595
3596         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3597         if (msfs == NULL) {
3598                 GOTO(out, rc = -EPROTO);
3599         }
3600
3601         /* Reinitialize the RDONLY and DEGRADED flags at the client
3602          * on each statfs, so they don't stay set permanently. */
3603         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3604
3605         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3606                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3607         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3608                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3609
3610         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3611                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3612         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3613                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3614
3615         /* Add a bit of hysteresis so this flag isn't continually flapping,
3616          * and ensure that new files don't get extremely fragmented due to
3617          * only a small amount of available space in the filesystem.
3618          * We want to set the NOSPC flag when there is less than ~0.1% free
3619          * and clear it when there is at least ~0.2% free space, so:
3620          *                   avail < ~0.1% max          max = avail + used
3621          *            1025 * avail < avail + used       used = blocks - free
3622          *            1024 * avail < used
3623          *            1024 * avail < blocks - free                      
3624          *                   avail < ((blocks - free) >> 10)    
3625          *
3626          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3627          * lose that amount of space so in those cases we report no space left
3628          * if their is less than 1 GB left.                             */
3629         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3630         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3631                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3632                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3633         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3634                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3635                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3636
3637         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3638
3639         *aa->aa_oi->oi_osfs = *msfs;
3640 out:
3641         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3642         RETURN(rc);
3643 }
3644
3645 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3646                             __u64 max_age, struct ptlrpc_request_set *rqset)
3647 {
3648         struct ptlrpc_request *req;
3649         struct osc_async_args *aa;
3650         int                    rc;
3651         ENTRY;
3652
3653         /* We could possibly pass max_age in the request (as an absolute
3654          * timestamp or a "seconds.usec ago") so the target can avoid doing
3655          * extra calls into the filesystem if that isn't necessary (e.g.
3656          * during mount that would help a bit).  Having relative timestamps
3657          * is not so great if request processing is slow, while absolute
3658          * timestamps are not ideal because they need time synchronization. */
3659         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3660         if (req == NULL)
3661                 RETURN(-ENOMEM);
3662
3663         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3664         if (rc) {
3665                 ptlrpc_request_free(req);
3666                 RETURN(rc);
3667         }
3668         ptlrpc_request_set_replen(req);
3669         req->rq_request_portal = OST_CREATE_PORTAL;
3670         ptlrpc_at_set_req_timeout(req);
3671
3672         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3673                 /* procfs requests not want stat in wait for avoid deadlock */
3674                 req->rq_no_resend = 1;
3675                 req->rq_no_delay = 1;
3676         }
3677
3678         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3679         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3680         aa = ptlrpc_req_async_args(req);
3681         aa->aa_oi = oinfo;
3682
3683         ptlrpc_set_add_req(rqset, req);
3684         RETURN(0);
3685 }
3686
3687 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3688                       __u64 max_age, __u32 flags)
3689 {
3690         struct obd_statfs     *msfs;
3691         struct ptlrpc_request *req;
3692         struct obd_import     *imp = NULL;
3693         int rc;
3694         ENTRY;
3695
3696         /*Since the request might also come from lprocfs, so we need
3697          *sync this with client_disconnect_export Bug15684*/
3698         cfs_down_read(&obd->u.cli.cl_sem);
3699         if (obd->u.cli.cl_import)
3700                 imp = class_import_get(obd->u.cli.cl_import);
3701         cfs_up_read(&obd->u.cli.cl_sem);
3702         if (!imp)
3703                 RETURN(-ENODEV);
3704
3705         /* We could possibly pass max_age in the request (as an absolute
3706          * timestamp or a "seconds.usec ago") so the target can avoid doing
3707          * extra calls into the filesystem if that isn't necessary (e.g.
3708          * during mount that would help a bit).  Having relative timestamps
3709          * is not so great if request processing is slow, while absolute
3710          * timestamps are not ideal because they need time synchronization. */
3711         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3712
3713         class_import_put(imp);
3714
3715         if (req == NULL)
3716                 RETURN(-ENOMEM);
3717
3718         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3719         if (rc) {
3720                 ptlrpc_request_free(req);
3721                 RETURN(rc);
3722         }
3723         ptlrpc_request_set_replen(req);
3724         req->rq_request_portal = OST_CREATE_PORTAL;
3725         ptlrpc_at_set_req_timeout(req);
3726
3727         if (flags & OBD_STATFS_NODELAY) {
3728                 /* procfs requests not want stat in wait for avoid deadlock */
3729                 req->rq_no_resend = 1;
3730                 req->rq_no_delay = 1;
3731         }
3732
3733         rc = ptlrpc_queue_wait(req);
3734         if (rc)
3735                 GOTO(out, rc);
3736
3737         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3738         if (msfs == NULL) {
3739                 GOTO(out, rc = -EPROTO);
3740         }
3741
3742         *osfs = *msfs;
3743
3744         EXIT;
3745  out:
3746         ptlrpc_req_finished(req);
3747         return rc;
3748 }
3749
3750 /* Retrieve object striping information.
3751  *
3752  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3753  * the maximum number of OST indices which will fit in the user buffer.
3754  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3755  */
3756 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3757 {
3758         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3759         struct lov_user_md_v3 lum, *lumk;
3760         struct lov_user_ost_data_v1 *lmm_objects;
3761         int rc = 0, lum_size;
3762         ENTRY;
3763
3764         if (!lsm)
3765                 RETURN(-ENODATA);
3766
3767         /* we only need the header part from user space to get lmm_magic and
3768          * lmm_stripe_count, (the header part is common to v1 and v3) */
3769         lum_size = sizeof(struct lov_user_md_v1);
3770         if (cfs_copy_from_user(&lum, lump, lum_size))
3771                 RETURN(-EFAULT);
3772
3773         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3774             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3775                 RETURN(-EINVAL);
3776
3777         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3778         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3779         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3780         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3781
3782         /* we can use lov_mds_md_size() to compute lum_size
3783          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3784         if (lum.lmm_stripe_count > 0) {
3785                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3786                 OBD_ALLOC(lumk, lum_size);
3787                 if (!lumk)
3788                         RETURN(-ENOMEM);
3789
3790                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3791                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3792                 else
3793                         lmm_objects = &(lumk->lmm_objects[0]);
3794                 lmm_objects->l_object_id = lsm->lsm_object_id;
3795         } else {
3796                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3797                 lumk = &lum;
3798         }
3799
3800         lumk->lmm_object_id = lsm->lsm_object_id;
3801         lumk->lmm_object_seq = lsm->lsm_object_seq;
3802         lumk->lmm_stripe_count = 1;
3803
3804         if (cfs_copy_to_user(lump, lumk, lum_size))
3805                 rc = -EFAULT;
3806
3807         if (lumk != &lum)
3808                 OBD_FREE(lumk, lum_size);
3809
3810         RETURN(rc);
3811 }
3812
3813
3814 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3815                          void *karg, void *uarg)
3816 {
3817         struct obd_device *obd = exp->exp_obd;
3818         struct obd_ioctl_data *data = karg;
3819         int err = 0;
3820         ENTRY;
3821
3822         if (!cfs_try_module_get(THIS_MODULE)) {
3823                 CERROR("Can't get module. Is it alive?");
3824                 return -EINVAL;
3825         }
3826         switch (cmd) {
3827         case OBD_IOC_LOV_GET_CONFIG: {
3828                 char *buf;
3829                 struct lov_desc *desc;
3830                 struct obd_uuid uuid;
3831
3832                 buf = NULL;
3833                 len = 0;
3834                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3835                         GOTO(out, err = -EINVAL);
3836
3837                 data = (struct obd_ioctl_data *)buf;
3838
3839                 if (sizeof(*desc) > data->ioc_inllen1) {
3840                         obd_ioctl_freedata(buf, len);
3841                         GOTO(out, err = -EINVAL);
3842                 }
3843
3844                 if (data->ioc_inllen2 < sizeof(uuid)) {
3845                         obd_ioctl_freedata(buf, len);
3846                         GOTO(out, err = -EINVAL);
3847                 }
3848
3849                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3850                 desc->ld_tgt_count = 1;
3851                 desc->ld_active_tgt_count = 1;
3852                 desc->ld_default_stripe_count = 1;
3853                 desc->ld_default_stripe_size = 0;
3854                 desc->ld_default_stripe_offset = 0;
3855                 desc->ld_pattern = 0;
3856                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3857
3858                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3859
3860                 err = cfs_copy_to_user((void *)uarg, buf, len);
3861                 if (err)
3862                         err = -EFAULT;
3863                 obd_ioctl_freedata(buf, len);
3864                 GOTO(out, err);
3865         }
3866         case LL_IOC_LOV_SETSTRIPE:
3867                 err = obd_alloc_memmd(exp, karg);
3868                 if (err > 0)
3869                         err = 0;
3870                 GOTO(out, err);
3871         case LL_IOC_LOV_GETSTRIPE:
3872                 err = osc_getstripe(karg, uarg);
3873                 GOTO(out, err);
3874         case OBD_IOC_CLIENT_RECOVER:
3875                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3876                                             data->ioc_inlbuf1);
3877                 if (err > 0)
3878                         err = 0;
3879                 GOTO(out, err);
3880         case IOC_OSC_SET_ACTIVE:
3881                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3882                                                data->ioc_offset);
3883                 GOTO(out, err);
3884         case OBD_IOC_POLL_QUOTACHECK:
3885                 err = lquota_poll_check(quota_interface, exp,
3886                                         (struct if_quotacheck *)karg);
3887                 GOTO(out, err);
3888         case OBD_IOC_PING_TARGET:
3889                 err = ptlrpc_obd_ping(obd);
3890                 GOTO(out, err);
3891         default:
3892                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3893                        cmd, cfs_curproc_comm());
3894                 GOTO(out, err = -ENOTTY);
3895         }
3896 out:
3897         cfs_module_put(THIS_MODULE);
3898         return err;
3899 }
3900
3901 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3902                         void *key, __u32 *vallen, void *val,
3903                         struct lov_stripe_md *lsm)
3904 {
3905         ENTRY;
3906         if (!vallen || !val)
3907                 RETURN(-EFAULT);
3908
3909         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3910                 __u32 *stripe = val;
3911                 *vallen = sizeof(*stripe);
3912                 *stripe = 0;
3913                 RETURN(0);
3914         } else if (KEY_IS(KEY_LAST_ID)) {
3915                 struct ptlrpc_request *req;
3916                 obd_id                *reply;
3917                 char                  *tmp;
3918                 int                    rc;
3919
3920                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3921                                            &RQF_OST_GET_INFO_LAST_ID);
3922                 if (req == NULL)
3923                         RETURN(-ENOMEM);
3924
3925                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3926                                      RCL_CLIENT, keylen);
3927                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3928                 if (rc) {
3929                         ptlrpc_request_free(req);
3930                         RETURN(rc);
3931                 }
3932
3933                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3934                 memcpy(tmp, key, keylen);
3935
3936                 req->rq_no_delay = req->rq_no_resend = 1;
3937                 ptlrpc_request_set_replen(req);
3938                 rc = ptlrpc_queue_wait(req);
3939                 if (rc)
3940                         GOTO(out, rc);
3941
3942                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3943                 if (reply == NULL)
3944                         GOTO(out, rc = -EPROTO);
3945
3946                 *((obd_id *)val) = *reply;
3947         out:
3948                 ptlrpc_req_finished(req);
3949                 RETURN(rc);
3950         } else if (KEY_IS(KEY_FIEMAP)) {
3951                 struct ptlrpc_request *req;
3952                 struct ll_user_fiemap *reply;
3953                 char *tmp;
3954                 int rc;
3955
3956                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3957                                            &RQF_OST_GET_INFO_FIEMAP);
3958                 if (req == NULL)
3959                         RETURN(-ENOMEM);
3960
3961                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3962                                      RCL_CLIENT, keylen);
3963                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3964                                      RCL_CLIENT, *vallen);
3965                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3966                                      RCL_SERVER, *vallen);
3967
3968                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3969                 if (rc) {
3970                         ptlrpc_request_free(req);
3971                         RETURN(rc);
3972                 }
3973
3974                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3975                 memcpy(tmp, key, keylen);
3976                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3977                 memcpy(tmp, val, *vallen);
3978
3979                 ptlrpc_request_set_replen(req);
3980                 rc = ptlrpc_queue_wait(req);
3981                 if (rc)
3982                         GOTO(out1, rc);
3983
3984                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3985                 if (reply == NULL)
3986                         GOTO(out1, rc = -EPROTO);
3987
3988                 memcpy(val, reply, *vallen);
3989         out1:
3990                 ptlrpc_req_finished(req);
3991
3992                 RETURN(rc);
3993         }
3994
3995         RETURN(-EINVAL);
3996 }
3997
3998 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
3999 {
4000         struct llog_ctxt *ctxt;
4001         int rc = 0;
4002         ENTRY;
4003
4004         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4005         if (ctxt) {
4006                 rc = llog_initiator_connect(ctxt);
4007                 llog_ctxt_put(ctxt);
4008         } else {
4009                 /* XXX return an error? skip setting below flags? */
4010         }
4011
4012         cfs_spin_lock(&imp->imp_lock);
4013         imp->imp_server_timeout = 1;
4014         imp->imp_pingable = 1;
4015         cfs_spin_unlock(&imp->imp_lock);
4016         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4017
4018         RETURN(rc);
4019 }
4020
4021 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4022                                           struct ptlrpc_request *req,
4023                                           void *aa, int rc)
4024 {
4025         ENTRY;
4026         if (rc != 0)
4027                 RETURN(rc);
4028
4029         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4030 }
4031
4032 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4033                               void *key, obd_count vallen, void *val,
4034                               struct ptlrpc_request_set *set)
4035 {
4036         struct ptlrpc_request *req;
4037         struct obd_device     *obd = exp->exp_obd;
4038         struct obd_import     *imp = class_exp2cliimp(exp);
4039         char                  *tmp;
4040         int                    rc;
4041         ENTRY;
4042
4043         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4044
4045         if (KEY_IS(KEY_NEXT_ID)) {
4046                 obd_id new_val;
4047                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4048
4049                 if (vallen != sizeof(obd_id))
4050                         RETURN(-ERANGE);
4051                 if (val == NULL)
4052                         RETURN(-EINVAL);
4053
4054                 if (vallen != sizeof(obd_id))
4055                         RETURN(-EINVAL);
4056
4057                 /* avoid race between allocate new object and set next id
4058                  * from ll_sync thread */
4059                 cfs_spin_lock(&oscc->oscc_lock);
4060                 new_val = *((obd_id*)val) + 1;
4061                 if (new_val > oscc->oscc_next_id)
4062                         oscc->oscc_next_id = new_val;
4063                 cfs_spin_unlock(&oscc->oscc_lock);
4064                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4065                        exp->exp_obd->obd_name,
4066                        obd->u.cli.cl_oscc.oscc_next_id);
4067
4068                 RETURN(0);
4069         }
4070
4071         if (KEY_IS(KEY_CHECKSUM)) {
4072                 if (vallen != sizeof(int))
4073                         RETURN(-EINVAL);
4074                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4075                 RETURN(0);
4076         }
4077
4078         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4079                 sptlrpc_conf_client_adapt(obd);
4080                 RETURN(0);
4081         }
4082
4083         if (KEY_IS(KEY_FLUSH_CTX)) {
4084                 sptlrpc_import_flush_my_ctx(imp);
4085                 RETURN(0);
4086         }
4087
4088         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4089                 RETURN(-EINVAL);
4090
4091         /* We pass all other commands directly to OST. Since nobody calls osc
4092            methods directly and everybody is supposed to go through LOV, we
4093            assume lov checked invalid values for us.
4094            The only recognised values so far are evict_by_nid and mds_conn.
4095            Even if something bad goes through, we'd get a -EINVAL from OST
4096            anyway. */
4097
4098         if (KEY_IS(KEY_GRANT_SHRINK))
4099                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4100         else
4101                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4102
4103         if (req == NULL)
4104                 RETURN(-ENOMEM);
4105
4106         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4107                              RCL_CLIENT, keylen);
4108         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4109                              RCL_CLIENT, vallen);
4110         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4111         if (rc) {
4112                 ptlrpc_request_free(req);
4113                 RETURN(rc);
4114         }
4115
4116         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4117         memcpy(tmp, key, keylen);
4118         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4119         memcpy(tmp, val, vallen);
4120
4121         if (KEY_IS(KEY_MDS_CONN)) {
4122                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4123
4124                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4125                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4126                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4127                 req->rq_no_delay = req->rq_no_resend = 1;
4128                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4129         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4130                 struct osc_grant_args *aa;
4131                 struct obdo *oa;
4132
4133                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4134                 aa = ptlrpc_req_async_args(req);
4135                 OBDO_ALLOC(oa);
4136                 if (!oa) {
4137                         ptlrpc_req_finished(req);
4138                         RETURN(-ENOMEM);
4139                 }
4140                 *oa = ((struct ost_body *)val)->oa;
4141                 aa->aa_oa = oa;
4142                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4143         }
4144
4145         ptlrpc_request_set_replen(req);
4146         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4147                 LASSERT(set != NULL);
4148                 ptlrpc_set_add_req(set, req);
4149                 ptlrpc_check_set(NULL, set);
4150         } else
4151                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4152
4153         RETURN(0);
4154 }
4155
4156
4157 static struct llog_operations osc_size_repl_logops = {
4158         lop_cancel: llog_obd_repl_cancel
4159 };
4160
4161 static struct llog_operations osc_mds_ost_orig_logops;
4162
4163 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4164                            struct obd_device *tgt, struct llog_catid *catid)
4165 {
4166         int rc;
4167         ENTRY;
4168
4169         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4170                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4171         if (rc) {
4172                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4173                 GOTO(out, rc);
4174         }
4175
4176         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4177                         NULL, &osc_size_repl_logops);
4178         if (rc) {
4179                 struct llog_ctxt *ctxt =
4180                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4181                 if (ctxt)
4182                         llog_cleanup(ctxt);
4183                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4184         }
4185         GOTO(out, rc);
4186 out:
4187         if (rc) {
4188                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4189                        obd->obd_name, tgt->obd_name, catid, rc);
4190                 CERROR("logid "LPX64":0x%x\n",
4191                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4192         }
4193         return rc;
4194 }
4195
4196 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4197                          struct obd_device *disk_obd, int *index)
4198 {
4199         struct llog_catid catid;
4200         static char name[32] = CATLIST;
4201         int rc;
4202         ENTRY;
4203
4204         LASSERT(olg == &obd->obd_olg);
4205
4206         cfs_mutex_down(&olg->olg_cat_processing);
4207         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4208         if (rc) {
4209                 CERROR("rc: %d\n", rc);
4210                 GOTO(out, rc);
4211         }
4212
4213         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4214                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4215                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4216
4217         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4218         if (rc) {
4219                 CERROR("rc: %d\n", rc);
4220                 GOTO(out, rc);
4221         }
4222
4223         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4224         if (rc) {
4225                 CERROR("rc: %d\n", rc);
4226                 GOTO(out, rc);
4227         }
4228
4229  out:
4230         cfs_mutex_up(&olg->olg_cat_processing);
4231
4232         return rc;
4233 }
4234
4235 static int osc_llog_finish(struct obd_device *obd, int count)
4236 {
4237         struct llog_ctxt *ctxt;
4238         int rc = 0, rc2 = 0;
4239         ENTRY;
4240
4241         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4242         if (ctxt)
4243                 rc = llog_cleanup(ctxt);
4244
4245         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4246         if (ctxt)
4247                 rc2 = llog_cleanup(ctxt);
4248         if (!rc)
4249                 rc = rc2;
4250
4251         RETURN(rc);
4252 }
4253
4254 static int osc_reconnect(const struct lu_env *env,
4255                          struct obd_export *exp, struct obd_device *obd,
4256                          struct obd_uuid *cluuid,
4257                          struct obd_connect_data *data,
4258                          void *localdata)
4259 {
4260         struct client_obd *cli = &obd->u.cli;
4261
4262         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4263                 long lost_grant;
4264
4265                 client_obd_list_lock(&cli->cl_loi_list_lock);
4266                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4267                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4268                 lost_grant = cli->cl_lost_grant;
4269                 cli->cl_lost_grant = 0;
4270                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4271
4272                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4273                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4274                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4275                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4276                        " ocd_grant: %d\n", data->ocd_connect_flags,
4277                        data->ocd_version, data->ocd_grant);
4278         }
4279
4280         RETURN(0);
4281 }
4282
4283 static int osc_disconnect(struct obd_export *exp)
4284 {
4285         struct obd_device *obd = class_exp2obd(exp);
4286         struct llog_ctxt  *ctxt;
4287         int rc;
4288
4289         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4290         if (ctxt) {
4291                 if (obd->u.cli.cl_conn_count == 1) {
4292                         /* Flush any remaining cancel messages out to the
4293                          * target */
4294                         llog_sync(ctxt, exp);
4295                 }
4296                 llog_ctxt_put(ctxt);
4297         } else {
4298                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4299                        obd);
4300         }
4301
4302         rc = client_disconnect_export(exp);
4303         /**
4304          * Initially we put del_shrink_grant before disconnect_export, but it
4305          * causes the following problem if setup (connect) and cleanup
4306          * (disconnect) are tangled together.
4307          *      connect p1                     disconnect p2
4308          *   ptlrpc_connect_import
4309          *     ...............               class_manual_cleanup
4310          *                                     osc_disconnect
4311          *                                     del_shrink_grant
4312          *   ptlrpc_connect_interrupt
4313          *     init_grant_shrink
4314          *   add this client to shrink list
4315          *                                      cleanup_osc
4316          * Bang! pinger trigger the shrink.
4317          * So the osc should be disconnected from the shrink list, after we
4318          * are sure the import has been destroyed. BUG18662
4319          */
4320         if (obd->u.cli.cl_import == NULL)
4321                 osc_del_shrink_grant(&obd->u.cli);
4322         return rc;
4323 }
4324
4325 static int osc_import_event(struct obd_device *obd,
4326                             struct obd_import *imp,
4327                             enum obd_import_event event)
4328 {
4329         struct client_obd *cli;
4330         int rc = 0;
4331
4332         ENTRY;
4333         LASSERT(imp->imp_obd == obd);
4334
4335         switch (event) {
4336         case IMP_EVENT_DISCON: {
4337                 /* Only do this on the MDS OSC's */
4338                 if (imp->imp_server_timeout) {
4339                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4340
4341                         cfs_spin_lock(&oscc->oscc_lock);
4342                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4343                         cfs_spin_unlock(&oscc->oscc_lock);
4344                 }
4345                 cli = &obd->u.cli;
4346                 client_obd_list_lock(&cli->cl_loi_list_lock);
4347                 cli->cl_avail_grant = 0;
4348                 cli->cl_lost_grant = 0;
4349                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4350                 break;
4351         }
4352         case IMP_EVENT_INACTIVE: {
4353                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4354                 break;
4355         }
4356         case IMP_EVENT_INVALIDATE: {
4357                 struct ldlm_namespace *ns = obd->obd_namespace;
4358                 struct lu_env         *env;
4359                 int                    refcheck;
4360
4361                 env = cl_env_get(&refcheck);
4362                 if (!IS_ERR(env)) {
4363                         /* Reset grants */
4364                         cli = &obd->u.cli;
4365                         client_obd_list_lock(&cli->cl_loi_list_lock);
4366                         /* all pages go to failing rpcs due to the invalid
4367                          * import */
4368                         osc_check_rpcs(env, cli);
4369                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4370
4371                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4372                         cl_env_put(env, &refcheck);
4373                 } else
4374                         rc = PTR_ERR(env);
4375                 break;
4376         }
4377         case IMP_EVENT_ACTIVE: {
4378                 /* Only do this on the MDS OSC's */
4379                 if (imp->imp_server_timeout) {
4380                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4381
4382                         cfs_spin_lock(&oscc->oscc_lock);
4383                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4384                         cfs_spin_unlock(&oscc->oscc_lock);
4385                 }
4386                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4387                 break;
4388         }
4389         case IMP_EVENT_OCD: {
4390                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4391
4392                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4393                         osc_init_grant(&obd->u.cli, ocd);
4394
4395                 /* See bug 7198 */
4396                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4397                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4398
4399                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4400                 break;
4401         }
4402         case IMP_EVENT_DEACTIVATE: {
4403                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4404                 break;
4405         }
4406         case IMP_EVENT_ACTIVATE: {
4407                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4408                 break;
4409         }
4410         default:
4411                 CERROR("Unknown import event %d\n", event);
4412                 LBUG();
4413         }
4414         RETURN(rc);
4415 }
4416
4417 /**
4418  * Determine whether the lock can be canceled before replaying the lock
4419  * during recovery, see bug16774 for detailed information.
4420  *
4421  * \retval zero the lock can't be canceled
4422  * \retval other ok to cancel
4423  */
4424 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4425 {
4426         check_res_locked(lock->l_resource);
4427
4428         /*
4429          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4430          *
4431          * XXX as a future improvement, we can also cancel unused write lock
4432          * if it doesn't have dirty data and active mmaps.
4433          */
4434         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4435             (lock->l_granted_mode == LCK_PR ||
4436              lock->l_granted_mode == LCK_CR) &&
4437             (osc_dlm_lock_pageref(lock) == 0))
4438                 RETURN(1);
4439
4440         RETURN(0);
4441 }
4442
4443 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4444 {
4445         int rc;
4446         ENTRY;
4447
4448         ENTRY;
4449         rc = ptlrpcd_addref();
4450         if (rc)
4451                 RETURN(rc);
4452
4453         rc = client_obd_setup(obd, lcfg);
4454         if (rc) {
4455                 ptlrpcd_decref();
4456         } else {
4457                 struct lprocfs_static_vars lvars = { 0 };
4458                 struct client_obd *cli = &obd->u.cli;
4459
4460                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4461                 lprocfs_osc_init_vars(&lvars);
4462                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4463                         lproc_osc_attach_seqstat(obd);
4464                         sptlrpc_lprocfs_cliobd_attach(obd);
4465                         ptlrpc_lprocfs_register_obd(obd);
4466                 }
4467
4468                 oscc_init(obd);
4469                 /* We need to allocate a few requests more, because
4470                    brw_interpret tries to create new requests before freeing
4471                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4472                    reserved, but I afraid that might be too much wasted RAM
4473                    in fact, so 2 is just my guess and still should work. */
4474                 cli->cl_import->imp_rq_pool =
4475                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4476                                             OST_MAXREQSIZE,
4477                                             ptlrpc_add_rqs_to_pool);
4478
4479                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4480                 cfs_sema_init(&cli->cl_grant_sem, 1);
4481
4482                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4483         }
4484
4485         RETURN(rc);
4486 }
4487
4488 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4489 {
4490         int rc = 0;
4491         ENTRY;
4492
4493         switch (stage) {
4494         case OBD_CLEANUP_EARLY: {
4495                 struct obd_import *imp;
4496                 imp = obd->u.cli.cl_import;
4497                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4498                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4499                 ptlrpc_deactivate_import(imp);
4500                 cfs_spin_lock(&imp->imp_lock);
4501                 imp->imp_pingable = 0;
4502                 cfs_spin_unlock(&imp->imp_lock);
4503                 break;
4504         }
4505         case OBD_CLEANUP_EXPORTS: {
4506                 /* If we set up but never connected, the
4507                    client import will not have been cleaned. */
4508                 if (obd->u.cli.cl_import) {
4509                         struct obd_import *imp;
4510                         cfs_down_write(&obd->u.cli.cl_sem);
4511                         imp = obd->u.cli.cl_import;
4512                         CDEBUG(D_CONFIG, "%s: client import never connected\n",
4513                                obd->obd_name);
4514                         ptlrpc_invalidate_import(imp);
4515                         if (imp->imp_rq_pool) {
4516                                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
4517                                 imp->imp_rq_pool = NULL;
4518                         }
4519                         class_destroy_import(imp);
4520                         cfs_up_write(&obd->u.cli.cl_sem);
4521                         obd->u.cli.cl_import = NULL;
4522                 }
4523                 rc = obd_llog_finish(obd, 0);
4524                 if (rc != 0)
4525                         CERROR("failed to cleanup llogging subsystems\n");
4526                 break;
4527                 }
4528         }
4529         RETURN(rc);
4530 }
4531
4532 int osc_cleanup(struct obd_device *obd)
4533 {
4534         int rc;
4535
4536         ENTRY;
4537         ptlrpc_lprocfs_unregister_obd(obd);
4538         lprocfs_obd_cleanup(obd);
4539
4540         /* free memory of osc quota cache */
4541         lquota_cleanup(quota_interface, obd);
4542
4543         rc = client_obd_cleanup(obd);
4544
4545         ptlrpcd_decref();
4546         RETURN(rc);
4547 }
4548
4549 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4550 {
4551         struct lprocfs_static_vars lvars = { 0 };
4552         int rc = 0;
4553
4554         lprocfs_osc_init_vars(&lvars);
4555
4556         switch (lcfg->lcfg_command) {
4557         default:
4558                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4559                                               lcfg, obd);
4560                 if (rc > 0)
4561                         rc = 0;
4562                 break;
4563         }
4564
4565         return(rc);
4566 }
4567
4568 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4569 {
4570         return osc_process_config_base(obd, buf);
4571 }
4572
4573 struct obd_ops osc_obd_ops = {
4574         .o_owner                = THIS_MODULE,
4575         .o_setup                = osc_setup,
4576         .o_precleanup           = osc_precleanup,
4577         .o_cleanup              = osc_cleanup,
4578         .o_add_conn             = client_import_add_conn,
4579         .o_del_conn             = client_import_del_conn,
4580         .o_connect              = client_connect_import,
4581         .o_reconnect            = osc_reconnect,
4582         .o_disconnect           = osc_disconnect,
4583         .o_statfs               = osc_statfs,
4584         .o_statfs_async         = osc_statfs_async,
4585         .o_packmd               = osc_packmd,
4586         .o_unpackmd             = osc_unpackmd,
4587         .o_precreate            = osc_precreate,
4588         .o_create               = osc_create,
4589         .o_create_async         = osc_create_async,
4590         .o_destroy              = osc_destroy,
4591         .o_getattr              = osc_getattr,
4592         .o_getattr_async        = osc_getattr_async,
4593         .o_setattr              = osc_setattr,
4594         .o_setattr_async        = osc_setattr_async,
4595         .o_brw                  = osc_brw,
4596         .o_punch                = osc_punch,
4597         .o_sync                 = osc_sync,
4598         .o_enqueue              = osc_enqueue,
4599         .o_change_cbdata        = osc_change_cbdata,
4600         .o_find_cbdata          = osc_find_cbdata,
4601         .o_cancel               = osc_cancel,
4602         .o_cancel_unused        = osc_cancel_unused,
4603         .o_iocontrol            = osc_iocontrol,
4604         .o_get_info             = osc_get_info,
4605         .o_set_info_async       = osc_set_info_async,
4606         .o_import_event         = osc_import_event,
4607         .o_llog_init            = osc_llog_init,
4608         .o_llog_finish          = osc_llog_finish,
4609         .o_process_config       = osc_process_config,
4610 };
4611
4612 extern struct lu_kmem_descr osc_caches[];
4613 extern cfs_spinlock_t       osc_ast_guard;
4614 extern cfs_lock_class_key_t osc_ast_guard_class;
4615
4616 int __init osc_init(void)
4617 {
4618         struct lprocfs_static_vars lvars = { 0 };
4619         int rc;
4620         ENTRY;
4621
4622         /* print an address of _any_ initialized kernel symbol from this
4623          * module, to allow debugging with gdb that doesn't support data
4624          * symbols from modules.*/
4625         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4626
4627         rc = lu_kmem_init(osc_caches);
4628
4629         lprocfs_osc_init_vars(&lvars);
4630
4631         cfs_request_module("lquota");
4632         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4633         lquota_init(quota_interface);
4634         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4635
4636         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4637                                  LUSTRE_OSC_NAME, &osc_device_type);
4638         if (rc) {
4639                 if (quota_interface)
4640                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4641                 lu_kmem_fini(osc_caches);
4642                 RETURN(rc);
4643         }
4644
4645         cfs_spin_lock_init(&osc_ast_guard);
4646         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4647
4648         osc_mds_ost_orig_logops = llog_lvfs_ops;
4649         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4650         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4651         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4652         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4653
4654         RETURN(rc);
4655 }
4656
4657 #ifdef __KERNEL__
4658 static void /*__exit*/ osc_exit(void)
4659 {
4660         lu_device_type_fini(&osc_device_type);
4661
4662         lquota_exit(quota_interface);
4663         if (quota_interface)
4664                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4665
4666         class_unregister_type(LUSTRE_OSC_NAME);
4667         lu_kmem_fini(osc_caches);
4668 }
4669
4670 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4671 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4672 MODULE_LICENSE("GPL");
4673
4674 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4675 #endif