Whamcloud - gitweb
ef3cc259ddd9702e26782b6b889da10649367a0c
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osp/osp_sync.c
33  *
34  * Lustre OST Proxy Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <linux/kthread.h>
43 #include <lustre_log.h>
44 #include <lustre_update.h>
45 #include "osp_internal.h"
46
47 static int osp_sync_id_traction_init(struct osp_device *d);
48 static void osp_sync_id_traction_fini(struct osp_device *d);
49 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
50 static void osp_sync_remove_from_tracker(struct osp_device *d);
51
52 /*
53  * this is a components of OSP implementing synchronization between MDS and OST
54  * it llogs all interesting changes (currently it's uig/gid change and object
55  * destroy) atomically, then makes sure changes hit OST storage
56  *
57  * we have 4 queues of work:
58  *
59  * the first queue is llog itself, once read a change is stored in 2nd queue
60  * in form of RPC (but RPC isn't fired yet).
61  *
62  * the second queue (opd_syn_waiting_for_commit) holds changes awaiting local
63  * commit. once change is committed locally it migrates onto 3rd queue.
64  *
65  * the third queue (opd_syn_committed_here) holds changes committed locally,
66  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
67  * we take a change from the queue and fire corresponded RPC.
68  *
69  * once RPC is reported committed by OST (using regular last_committed mech.)
70  * the change jumps into 4th queue (opd_syn_committed_there), now we can
71  * cancel corresponded llog record and release RPC
72  *
73  * opd_syn_changes is a number of unread llog records (to be processed).
74  * notice this number doesn't include llog records from previous boots.
75  * with OSP_SYN_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
76  *
77  * opd_syn_rpc_in_progress is a number of requests in 2-4 queues.
78  * we control this with OSP_MAX_IN_PROGRESS so that OSP don't consume
79  * too much memory -- how to deal with 1000th OSTs ? batching could help?
80  *
81  * opd_syn_rpc_in_flight is a number of RPC in flight.
82  * we control this with OSP_MAX_IN_FLIGHT
83  */
84
85 /* XXX: do math to learn reasonable threshold
86  * should it be ~ number of changes fitting bulk? */
87
88 #define OSP_SYN_THRESHOLD       10
89 #define OSP_MAX_IN_FLIGHT       8
90 #define OSP_MAX_IN_PROGRESS     4096
91
92 #define OSP_JOB_MAGIC           0x26112005
93
94 struct osp_job_req_args {
95         /** bytes reserved for ptlrpc_replay_req() */
96         struct ptlrpc_replay_async_args jra_raa;
97         struct list_head                jra_committed_link;
98         struct list_head                jra_inflight_link;
99         struct llog_cookie              jra_lcookie;
100         __u32                           jra_magic;
101 };
102
103 static inline int osp_sync_running(struct osp_device *d)
104 {
105         return !!(d->opd_syn_thread.t_flags & SVC_RUNNING);
106 }
107
108 /**
109  * Check status: whether OSP thread has stopped
110  *
111  * \param[in] d         OSP device
112  *
113  * \retval 0            still running
114  * \retval 1            stopped
115  */
116 static inline int osp_sync_stopped(struct osp_device *d)
117 {
118         return !!(d->opd_syn_thread.t_flags & SVC_STOPPED);
119 }
120
121 /*
122  ** Check for new changes to sync
123  *
124  * \param[in] d         OSP device
125  *
126  * \retval 1            there are changes
127  * \retval 0            there are no changes
128  */
129 static inline int osp_sync_has_new_job(struct osp_device *d)
130 {
131         return ((d->opd_syn_last_processed_id < d->opd_syn_last_used_id) &&
132                 (d->opd_syn_last_processed_id < d->opd_syn_last_committed_id))
133                 || (d->opd_syn_prev_done == 0);
134 }
135
136 static inline int osp_sync_inflight_conflict(struct osp_device *d,
137                                              struct llog_rec_hdr *h)
138 {
139         struct osp_job_req_args *jra;
140         struct ost_id            ostid;
141         int                      conflict = 0;
142
143         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
144             list_empty(&d->opd_syn_inflight_list))
145                 return conflict;
146
147         memset(&ostid, 0, sizeof(ostid));
148         switch (h->lrh_type) {
149         case MDS_UNLINK_REC: {
150                 struct llog_unlink_rec *unlink = (struct llog_unlink_rec *)h;
151
152                 ostid_set_seq(&ostid, unlink->lur_oseq);
153                 if (ostid_set_id(&ostid, unlink->lur_oid)) {
154                         CERROR("Bad %llu to set " DOSTID "\n",
155                                (unsigned long long)(unlink->lur_oid),
156                                POSTID(&ostid));
157                         return 1;
158                 }
159                 }
160                 break;
161         case MDS_UNLINK64_REC:
162                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
163                 break;
164         case MDS_SETATTR64_REC:
165                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
166                 break;
167         default:
168                 LBUG();
169         }
170
171         spin_lock(&d->opd_syn_lock);
172         list_for_each_entry(jra, &d->opd_syn_inflight_list, jra_inflight_link) {
173                 struct ptlrpc_request   *req;
174                 struct ost_body         *body;
175
176                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
177
178                 req = container_of((void *)jra, struct ptlrpc_request,
179                                    rq_async_args);
180                 body = req_capsule_client_get(&req->rq_pill,
181                                               &RMF_OST_BODY);
182                 LASSERT(body);
183
184                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
185                         conflict = 1;
186                         break;
187                 }
188         }
189         spin_unlock(&d->opd_syn_lock);
190
191         return conflict;
192 }
193
194 static inline int osp_sync_low_in_progress(struct osp_device *d)
195 {
196         return atomic_read(&d->opd_syn_rpc_in_progress) <
197                 d->opd_syn_max_rpc_in_progress;
198 }
199
200 /**
201  * Check for room in the network pipe to OST
202  *
203  * \param[in] d         OSP device
204  *
205  * \retval 1            there is room
206  * \retval 0            no room, the pipe is full
207  */
208 static inline int osp_sync_low_in_flight(struct osp_device *d)
209 {
210         return atomic_read(&d->opd_syn_rpc_in_flight) <
211                 d->opd_syn_max_rpc_in_flight;
212 }
213
214 /**
215  * Wake up check for the main sync thread
216  *
217  * \param[in] d         OSP device
218  *
219  * \retval 1            time to wake up
220  * \retval 0            no need to wake up
221  */
222 static inline int osp_sync_has_work(struct osp_device *d)
223 {
224         /* has new/old changes and low in-progress? */
225         if (osp_sync_has_new_job(d) && osp_sync_low_in_progress(d) &&
226             osp_sync_low_in_flight(d) && d->opd_imp_connected)
227                 return 1;
228
229         /* has remotely committed? */
230         if (!list_empty(&d->opd_syn_committed_there))
231                 return 1;
232
233         return 0;
234 }
235
236 #define osp_sync_check_for_work(d)                      \
237 {                                                       \
238         if (osp_sync_has_work(d)) {                     \
239                 wake_up(&d->opd_syn_waitq);    \
240         }                                               \
241 }
242
243 void __osp_sync_check_for_work(struct osp_device *d)
244 {
245         osp_sync_check_for_work(d);
246 }
247
248 static inline __u64 osp_sync_correct_id(struct osp_device *d,
249                                         struct llog_rec_hdr *rec)
250 {
251         /*
252          * llog use cyclic store with 32 bit lrh_id
253          * so overflow lrh_id is possible. Range between
254          * last_processed and last_committed is less than
255          * 64745 ^ 2 and less than 2^32 - 1
256          */
257         __u64 correct_id = d->opd_syn_last_committed_id;
258
259         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
260                 correct_id -= 0x100000000ULL;
261
262         correct_id &= ~0xffffffffULL;
263         correct_id |= rec->lrh_id;
264
265         return correct_id;
266 }
267 /**
268  * Check and return ready-for-new status.
269  *
270  * The thread processing llog record uses this function to check whether
271  * it's time to take another record and process it. The number of conditions
272  * must be met: the connection should be ready, RPCs in flight not exceeding
273  * the limit, the record is committed locally, etc (see the lines below).
274  *
275  * \param[in] d         OSP device
276  * \param[in] rec       next llog record to process
277  *
278  * \retval 0            not ready
279  * \retval 1            ready
280  */
281 static inline int osp_sync_can_process_new(struct osp_device *d,
282                                            struct llog_rec_hdr *rec)
283 {
284         LASSERT(d);
285
286         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
287                 return 0;
288         if (unlikely(osp_sync_inflight_conflict(d, rec)))
289                 return 0;
290         if (!osp_sync_low_in_progress(d))
291                 return 0;
292         if (!osp_sync_low_in_flight(d))
293                 return 0;
294         if (!d->opd_imp_connected)
295                 return 0;
296         if (d->opd_syn_prev_done == 0)
297                 return 1;
298         if (atomic_read(&d->opd_syn_changes) == 0)
299                 return 0;
300         if (rec == NULL ||
301             osp_sync_correct_id(d, rec) <= d->opd_syn_last_committed_id)
302                 return 1;
303         return 0;
304 }
305
306 /**
307  * Declare intention to add a new change.
308  *
309  * With regard to OSD API, we have to declare any changes ahead. In this
310  * case we declare an intention to add a llog record representing the
311  * change on the local storage.
312  *
313  * \param[in] env       LU environment provided by the caller
314  * \param[in] o         OSP object
315  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
316  * \param[in] th        transaction handle (local)
317  *
318  * \retval 0            on success
319  * \retval negative     negated errno on error
320  */
321 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
322                          llog_op_type type, struct thandle *th)
323 {
324         struct osp_thread_info  *osi = osp_env_info(env);
325         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
326         struct llog_ctxt        *ctxt;
327         struct thandle          *storage_th;
328         int                      rc;
329
330         ENTRY;
331
332         /* it's a layering violation, to access internals of th,
333          * but we can do this as a sanity check, for a while */
334         LASSERT(th->th_top != NULL);
335         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
336         if (IS_ERR(storage_th))
337                 RETURN(PTR_ERR(storage_th));
338
339         switch (type) {
340         case MDS_UNLINK64_REC:
341                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
342                 break;
343         case MDS_SETATTR64_REC:
344                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec_v2);
345                 break;
346         default:
347                 LBUG();
348         }
349
350         /* we want ->dt_trans_start() to allocate per-thandle structure */
351         storage_th->th_tags |= LCT_OSP_THREAD;
352
353         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
354         LASSERT(ctxt);
355
356         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
357                               storage_th);
358         llog_ctxt_put(ctxt);
359
360         RETURN(rc);
361 }
362
363 /**
364  * Generate a llog record for a given change.
365  *
366  * Generates a llog record for the change passed. The change can be of two
367  * types: unlink and setattr. The record gets an ID which later will be
368  * used to track commit status of the change. For unlink changes, the caller
369  * can supply a starting FID and the count of the objects to destroy. For
370  * setattr the caller should apply attributes to apply.
371  *
372  *
373  * \param[in] env       LU environment provided by the caller
374  * \param[in] d         OSP device
375  * \param[in] fid       fid of the object the change should be applied to
376  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
377  * \param[in] count     count of objects to destroy
378  * \param[in] th        transaction handle (local)
379  * \param[in] attr      attributes for setattr
380  *
381  * \retval 0            on success
382  * \retval negative     negated errno on error
383  */
384 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
385                             const struct lu_fid *fid, llog_op_type type,
386                             int count, struct thandle *th,
387                             const struct lu_attr *attr)
388 {
389         struct osp_thread_info  *osi = osp_env_info(env);
390         struct llog_ctxt        *ctxt;
391         struct osp_txn_info     *txn;
392         struct thandle          *storage_th;
393         int                      rc;
394
395         ENTRY;
396
397         /* it's a layering violation, to access internals of th,
398          * but we can do this as a sanity check, for a while */
399         LASSERT(th->th_top != NULL);
400         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
401         if (IS_ERR(storage_th))
402                 RETURN(PTR_ERR(storage_th));
403
404         switch (type) {
405         case MDS_UNLINK64_REC:
406                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
407                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
408                 osi->osi_unlink.lur_fid  = *fid;
409                 osi->osi_unlink.lur_count = count;
410                 break;
411         case MDS_SETATTR64_REC:
412                 rc = fid_to_ostid(fid, &osi->osi_oi);
413                 LASSERT(rc == 0);
414                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
415                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
416                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
417                 LASSERT(attr);
418                 osi->osi_setattr.lsr_uid = attr->la_uid;
419                 osi->osi_setattr.lsr_gid = attr->la_gid;
420                 osi->osi_setattr.lsr_projid = attr->la_projid;
421                 osi->osi_setattr.lsr_valid =
422                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
423                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
424                         ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
425                 break;
426         default:
427                 LBUG();
428         }
429
430         txn = osp_txn_info(&storage_th->th_ctx);
431         LASSERT(txn);
432
433         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
434         osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
435         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
436         if (ctxt == NULL)
437                 RETURN(-ENOMEM);
438
439         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
440                       storage_th);
441         llog_ctxt_put(ctxt);
442
443         if (likely(rc >= 0)) {
444                 CDEBUG(D_OTHER, "%s: new record "DFID":%x.%u: rc = %d\n",
445                        d->opd_obd->obd_name,
446                        PFID(&osi->osi_cookie.lgc_lgl.lgl_oi.oi_fid),
447                        osi->osi_cookie.lgc_lgl.lgl_ogen,
448                        osi->osi_cookie.lgc_index, rc);
449                 atomic_inc(&d->opd_syn_changes);
450         }
451         /* return 0 always here, error case just cause no llog record */
452         RETURN(0);
453 }
454
455 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
456                  llog_op_type type, struct thandle *th,
457                  const struct lu_attr *attr)
458 {
459         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
460                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
461                                 th, attr);
462 }
463
464 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
465                         struct lu_fid *fid, int lost, struct thandle *th)
466 {
467         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
468 }
469
470 /*
471  * it's quite obvious we can't maintain all the structures in the memory:
472  * while OST is down, MDS can be processing thousands and thousands of unlinks
473  * filling persistent llogs and in-core respresentation
474  *
475  * this doesn't scale at all. so we need basically the following:
476  * a) destroy/setattr append llog records
477  * b) once llog has grown to X records, we process first Y committed records
478  *
479  *  once record R is found via llog_process(), it becomes committed after any
480  *  subsequent commit callback (at the most)
481  */
482
483 /**
484  * ptlrpc commit callback.
485  *
486  * The callback is called by PTLRPC when a RPC is reported committed by the
487  * target (OST). We register the callback for the every RPC applying a change
488  * from the llog. This way we know then the llog records can be cancelled.
489  * Notice the callback can be called when OSP is finishing. We can detect this
490  * checking that actual transno in the request is less or equal of known
491  * committed transno (see osp_sync_process_committed() for the details).
492  * XXX: this is pretty expensive and can be improved later using batching.
493  *
494  * \param[in] req       request
495  */
496 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
497 {
498         struct osp_device *d = req->rq_cb_data;
499         struct osp_job_req_args *jra;
500
501         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
502
503         if (unlikely(req->rq_transno == 0))
504                 return;
505
506         /* do not do any opd_dyn_rpc_* accounting here
507          * it's done in osp_sync_interpret sooner or later */
508         LASSERT(d);
509
510         jra = ptlrpc_req_async_args(req);
511         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
512         LASSERT(list_empty(&jra->jra_committed_link));
513
514         ptlrpc_request_addref(req);
515
516         spin_lock(&d->opd_syn_lock);
517         list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
518         spin_unlock(&d->opd_syn_lock);
519
520         /* XXX: some batching wouldn't hurt */
521         wake_up(&d->opd_syn_waitq);
522 }
523
524 /**
525  * RPC interpretation callback.
526  *
527  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
528  * whether we should:
529  *  - put request on a special list to wait until it's committed by the target,
530  *    if the request is successful
531  *  - schedule llog record cancel if no target object is found
532  *  - try later (essentially after reboot) in case of unexpected error
533  *
534  * \param[in] env       LU environment provided by the caller
535  * \param[in] req       request replied
536  * \param[in] aa        callback data
537  * \param[in] rc        result of RPC
538  *
539  * \retval 0            always
540  */
541 static int osp_sync_interpret(const struct lu_env *env,
542                               struct ptlrpc_request *req, void *aa, int rc)
543 {
544         struct osp_device *d = req->rq_cb_data;
545         struct osp_job_req_args *jra = aa;
546
547         if (jra->jra_magic != OSP_JOB_MAGIC) {
548                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
549                 LBUG();
550         }
551         LASSERT(d);
552
553         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
554                atomic_read(&req->rq_refcount),
555                rc, (unsigned) req->rq_transno);
556         LASSERT(rc || req->rq_transno);
557
558         if (rc == -ENOENT) {
559                 /*
560                  * we tried to destroy object or update attributes,
561                  * but object doesn't exist anymore - cancell llog record
562                  */
563                 LASSERT(req->rq_transno == 0);
564                 LASSERT(list_empty(&jra->jra_committed_link));
565
566                 ptlrpc_request_addref(req);
567
568                 spin_lock(&d->opd_syn_lock);
569                 list_add(&jra->jra_committed_link, &d->opd_syn_committed_there);
570                 spin_unlock(&d->opd_syn_lock);
571
572                 wake_up(&d->opd_syn_waitq);
573         } else if (rc) {
574                 struct obd_import *imp = req->rq_import;
575                 /*
576                  * error happened, we'll try to repeat on next boot ?
577                  */
578                 LASSERTF(req->rq_transno == 0 ||
579                          req->rq_import_generation < imp->imp_generation,
580                          "transno %llu, rc %d, gen: req %d, imp %d\n",
581                          req->rq_transno, rc, req->rq_import_generation,
582                          imp->imp_generation);
583                 if (req->rq_transno == 0) {
584                         /* this is the last time we see the request
585                          * if transno is not zero, then commit cb
586                          * will be called at some point */
587                         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) > 0);
588                         atomic_dec(&d->opd_syn_rpc_in_progress);
589                 }
590
591                 wake_up(&d->opd_syn_waitq);
592         } else if (d->opd_pre != NULL &&
593                    unlikely(d->opd_pre_status == -ENOSPC)) {
594                 /*
595                  * if current status is -ENOSPC (lack of free space on OST)
596                  * then we should poll OST immediately once object destroy
597                  * is replied
598                  */
599                 osp_statfs_need_now(d);
600         }
601
602         spin_lock(&d->opd_syn_lock);
603         list_del_init(&jra->jra_inflight_link);
604         spin_unlock(&d->opd_syn_lock);
605         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) > 0);
606         atomic_dec(&d->opd_syn_rpc_in_flight);
607         if (unlikely(atomic_read(&d->opd_syn_barrier) > 0))
608                 wake_up(&d->opd_syn_barrier_waitq);
609         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
610                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
611                atomic_read(&d->opd_syn_rpc_in_progress));
612
613         osp_sync_check_for_work(d);
614
615         return 0;
616 }
617
618 /*
619  ** Add request to ptlrpc queue.
620  *
621  * This is just a tiny helper function to put the request on the sending list
622  *
623  * \param[in] d         OSP device
624  * \param[in] llh       llog handle where the record is stored
625  * \param[in] h         llog record
626  * \param[in] req       request
627  */
628 static void osp_sync_send_new_rpc(struct osp_device *d,
629                                   struct llog_handle *llh,
630                                   struct llog_rec_hdr *h,
631                                   struct ptlrpc_request *req)
632 {
633         struct osp_job_req_args *jra;
634
635         LASSERT(atomic_read(&d->opd_syn_rpc_in_flight) <=
636                 d->opd_syn_max_rpc_in_flight);
637
638         jra = ptlrpc_req_async_args(req);
639         jra->jra_magic = OSP_JOB_MAGIC;
640         jra->jra_lcookie.lgc_lgl = llh->lgh_id;
641         jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
642         jra->jra_lcookie.lgc_index = h->lrh_index;
643         INIT_LIST_HEAD(&jra->jra_committed_link);
644         spin_lock(&d->opd_syn_lock);
645         list_add_tail(&jra->jra_inflight_link, &d->opd_syn_inflight_list);
646         spin_unlock(&d->opd_syn_lock);
647
648         ptlrpcd_add_req(req);
649 }
650
651
652 /**
653  * Allocate and prepare RPC for a new change.
654  *
655  * The function allocates and initializes an RPC which will be sent soon to
656  * apply the change to the target OST. The request is initialized from the
657  * llog record passed. Notice only the fields common to all type of changes
658  * are initialized.
659  *
660  * \param[in] d         OSP device
661  * \param[in] op        type of the change
662  * \param[in] format    request format to be used
663  *
664  * \retval pointer              new request on success
665  * \retval ERR_PTR(errno)       on error
666  */
667 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
668                                                ost_cmd_t op,
669                                                const struct req_format *format)
670 {
671         struct ptlrpc_request   *req;
672         struct obd_import       *imp;
673         int                      rc;
674
675         /* Prepare the request */
676         imp = d->opd_obd->u.cli.cl_import;
677         LASSERT(imp);
678
679         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
680                 RETURN(ERR_PTR(-ENOMEM));
681
682         req = ptlrpc_request_alloc(imp, format);
683         if (req == NULL)
684                 RETURN(ERR_PTR(-ENOMEM));
685
686         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
687         if (rc) {
688                 ptlrpc_req_finished(req);
689                 return ERR_PTR(rc);
690         }
691
692         req->rq_interpret_reply = osp_sync_interpret;
693         req->rq_commit_cb = osp_sync_request_commit_cb;
694         req->rq_cb_data = d;
695
696         ptlrpc_request_set_replen(req);
697
698         return req;
699 }
700
701 /**
702  * Generate a request for setattr change.
703  *
704  * The function prepares a new RPC, initializes it with setattr specific
705  * bits and send the RPC.
706  *
707  * \param[in] d         OSP device
708  * \param[in] llh       llog handle where the record is stored
709  * \param[in] h         llog record
710  *
711  * \retval 0            on success
712  * \retval 1            on invalid record
713  * \retval negative     negated errno on error
714  */
715 static int osp_sync_new_setattr_job(struct osp_device *d,
716                                     struct llog_handle *llh,
717                                     struct llog_rec_hdr *h)
718 {
719         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
720         struct ptlrpc_request           *req;
721         struct ost_body                 *body;
722
723         ENTRY;
724         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
725
726         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
727                 RETURN(1);
728
729         /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
730          * so no bits other than these should be set. */
731         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
732             OBD_MD_FLPROJID)) != 0) {
733                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
734                         d->opd_obd->obd_name, rec->lsr_valid);
735                 /* return 1 on invalid record */
736                 RETURN(1);
737         }
738
739         req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
740         if (IS_ERR(req))
741                 RETURN(PTR_ERR(req));
742
743         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
744         LASSERT(body);
745         body->oa.o_oi = rec->lsr_oi;
746         body->oa.o_uid = rec->lsr_uid;
747         body->oa.o_gid = rec->lsr_gid;
748         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
749         if (h->lrh_len > sizeof(struct llog_setattr64_rec))
750                 body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
751                                       rec)->lsr_projid;
752
753         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
754          * we assume that both UID and GID are valid in that case. */
755         if (rec->lsr_valid == 0)
756                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
757         else
758                 body->oa.o_valid |= rec->lsr_valid;
759
760         osp_sync_send_new_rpc(d, llh, h, req);
761         RETURN(0);
762 }
763
764 /**
765  * Generate a request for unlink change.
766  *
767  * The function prepares a new RPC, initializes it with unlink(destroy)
768  * specific bits and sends the RPC. The function is used to handle
769  * llog_unlink_rec which were used in the older versions of Lustre.
770  * Current version uses llog_unlink_rec64.
771  *
772  * \param[in] d         OSP device
773  * \param[in] llh       llog handle where the record is stored
774  * \param[in] h         llog record
775  *
776  * \retval 0            on success
777  * \retval negative     negated errno on error
778  */
779 static int osp_sync_new_unlink_job(struct osp_device *d,
780                                    struct llog_handle *llh,
781                                    struct llog_rec_hdr *h)
782 {
783         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
784         struct ptlrpc_request   *req;
785         struct ost_body         *body;
786         int rc;
787
788         ENTRY;
789         LASSERT(h->lrh_type == MDS_UNLINK_REC);
790
791         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
792         if (IS_ERR(req))
793                 RETURN(PTR_ERR(req));
794
795         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
796         LASSERT(body);
797         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
798         rc = ostid_set_id(&body->oa.o_oi, rec->lur_oid);
799         if (rc)
800                 return rc;
801         body->oa.o_misc = rec->lur_count;
802         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
803         if (rec->lur_count)
804                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
805
806         osp_sync_send_new_rpc(d, llh, h, req);
807         RETURN(0);
808 }
809
810 /**
811  * Generate a request for unlink change.
812  *
813  * The function prepares a new RPC, initializes it with unlink(destroy)
814  * specific bits and sends the RPC. Depending on the target (MDT or OST)
815  * two different protocols are used. For MDT we use OUT (basically OSD API
816  * updates transferred via a network). For OST we still use the old
817  * protocol (OBD?), originally for compatibility. Later we can start to
818  * use OUT for OST as well, this will allow batching and better code
819  * unification.
820  *
821  * \param[in] d         OSP device
822  * \param[in] llh       llog handle where the record is stored
823  * \param[in] h         llog record
824  *
825  * \retval 0            on success
826  * \retval negative     negated errno on error
827  */
828 static int osp_sync_new_unlink64_job(struct osp_device *d,
829                                      struct llog_handle *llh,
830                                      struct llog_rec_hdr *h)
831 {
832         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
833         struct ptlrpc_request           *req = NULL;
834         struct ost_body                 *body;
835         int                              rc;
836
837         ENTRY;
838         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
839         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
840         if (IS_ERR(req))
841                 RETURN(PTR_ERR(req));
842
843         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
844         if (body == NULL)
845                 RETURN(-EFAULT);
846         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
847         if (rc < 0)
848                 RETURN(rc);
849         body->oa.o_misc = rec->lur_count;
850         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
851                            OBD_MD_FLOBJCOUNT;
852         osp_sync_send_new_rpc(d, llh, h, req);
853         RETURN(0);
854 }
855
856 /**
857  * Process llog records.
858  *
859  * This function is called to process the llog records committed locally.
860  * In the recovery model used by OSP we can apply a change to a remote
861  * target once corresponding transaction (like posix unlink) is committed
862  * locally so can't revert.
863  * Depending on the llog record type, a given handler is called that is
864  * responsible for preparing and sending the RPC to apply the change.
865  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
866  *
867  * \param[in] env       LU environment provided by the caller
868  * \param[in] d         OSP device
869  * \param[in] llh       llog handle where the record is stored
870  * \param[in] rec       llog record
871  */
872 static void osp_sync_process_record(const struct lu_env *env,
873                                     struct osp_device *d,
874                                     struct llog_handle *llh,
875                                     struct llog_rec_hdr *rec)
876 {
877         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
878         struct llog_cookie       cookie;
879         int                      rc = 0;
880
881         ENTRY;
882
883         cookie.lgc_lgl = llh->lgh_id;
884         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
885         cookie.lgc_index = rec->lrh_index;
886
887         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
888                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
889
890                 /* we're waiting for the record generated by this instance */
891                 LASSERT(d->opd_syn_prev_done == 0);
892                 if (!memcmp(&d->opd_syn_generation, &gen->lgr_gen,
893                             sizeof(gen->lgr_gen))) {
894                         CDEBUG(D_HA, "processed all old entries\n");
895                         d->opd_syn_prev_done = 1;
896                 }
897
898                 /* cancel any generation record */
899                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
900
901                 RETURN_EXIT;
902         }
903
904         /*
905          * now we prepare and fill requests to OST, put them on the queue
906          * and fire after next commit callback
907          */
908
909         /* notice we increment counters before sending RPC, to be consistent
910          * in RPC interpret callback which may happen very quickly */
911         atomic_inc(&d->opd_syn_rpc_in_flight);
912         atomic_inc(&d->opd_syn_rpc_in_progress);
913
914         switch (rec->lrh_type) {
915         /* case MDS_UNLINK_REC is kept for compatibility */
916         case MDS_UNLINK_REC:
917                 rc = osp_sync_new_unlink_job(d, llh, rec);
918                 break;
919         case MDS_UNLINK64_REC:
920                 rc = osp_sync_new_unlink64_job(d, llh, rec);
921                 break;
922         case MDS_SETATTR64_REC:
923                 rc = osp_sync_new_setattr_job(d, llh, rec);
924                 break;
925         default:
926                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
927                        rec->lrh_type);
928                 /* treat "unknown record type" as "invalid" */
929                 rc = 1;
930                 break;
931         }
932
933         /* For all kinds of records, not matter successful or not,
934          * we should decrease changes and bump last_processed_id.
935          */
936         if (d->opd_syn_prev_done) {
937                 __u64 correct_id = osp_sync_correct_id(d, rec);
938                 LASSERT(atomic_read(&d->opd_syn_changes) > 0);
939                 LASSERT(correct_id <= d->opd_syn_last_committed_id);
940                 /* NOTE: it's possible to meet same id if
941                  * OST stores few stripes of same file
942                  */
943                 while (1) {
944                         /* another thread may be trying to set new value */
945                         rmb();
946                         if (correct_id > d->opd_syn_last_processed_id) {
947                                 d->opd_syn_last_processed_id = correct_id;
948                                 wake_up(&d->opd_syn_barrier_waitq);
949                         } else
950                                 break;
951                 }
952                 atomic_dec(&d->opd_syn_changes);
953         }
954         if (rc != 0) {
955                 atomic_dec(&d->opd_syn_rpc_in_flight);
956                 atomic_dec(&d->opd_syn_rpc_in_progress);
957         }
958
959         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
960                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
961                atomic_read(&d->opd_syn_rpc_in_progress));
962
963         /* Delete the invalid record */
964         if (rc == 1) {
965                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
966                 if (rc != 0)
967                         CERROR("%s: can't delete invalid record: "
968                                "fid = "DFID", rec_id = %u, rc = %d\n",
969                                d->opd_obd->obd_name,
970                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
971                                rec->lrh_id, rc);
972         }
973
974         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
975                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
976
977         RETURN_EXIT;
978 }
979
980 /**
981  * Cancel llog records for the committed changes.
982  *
983  * The function walks through the list of the committed RPCs and cancels
984  * corresponding llog records. see osp_sync_request_commit_cb() for the
985  * details.
986  *
987  * \param[in] env       LU environment provided by the caller
988  * \param[in] d         OSP device
989  */
990 static void osp_sync_process_committed(const struct lu_env *env,
991                                        struct osp_device *d)
992 {
993         struct obd_device       *obd = d->opd_obd;
994         struct obd_import       *imp = obd->u.cli.cl_import;
995         struct ost_body         *body;
996         struct ptlrpc_request   *req;
997         struct llog_ctxt        *ctxt;
998         struct llog_handle      *llh;
999         struct list_head         list;
1000         int                      rc, done = 0;
1001
1002         ENTRY;
1003
1004         if (list_empty(&d->opd_syn_committed_there))
1005                 return;
1006
1007         /*
1008          * if current status is -ENOSPC (lack of free space on OST)
1009          * then we should poll OST immediately once object destroy
1010          * is committed.
1011          * notice: we do this upon commit as well because some backends
1012          * (like DMU) do not release space right away.
1013          */
1014         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1015                 osp_statfs_need_now(d);
1016
1017         /*
1018          * now cancel them all
1019          * XXX: can we improve this using some batching?
1020          *      with batch RPC that'll happen automatically?
1021          * XXX: can we store ctxt in lod_device and save few cycles ?
1022          */
1023         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1024         LASSERT(ctxt);
1025
1026         llh = ctxt->loc_handle;
1027         LASSERT(llh);
1028
1029         INIT_LIST_HEAD(&list);
1030         spin_lock(&d->opd_syn_lock);
1031         list_splice(&d->opd_syn_committed_there, &list);
1032         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1033         spin_unlock(&d->opd_syn_lock);
1034
1035         while (!list_empty(&list)) {
1036                 struct osp_job_req_args *jra;
1037
1038                 jra = list_entry(list.next, struct osp_job_req_args,
1039                                  jra_committed_link);
1040                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1041                 list_del_init(&jra->jra_committed_link);
1042
1043                 req = container_of((void *)jra, struct ptlrpc_request,
1044                                    rq_async_args);
1045                 body = req_capsule_client_get(&req->rq_pill,
1046                                               &RMF_OST_BODY);
1047                 LASSERT(body);
1048                 /* import can be closing, thus all commit cb's are
1049                  * called we can check committness directly */
1050                 if (req->rq_import_generation == imp->imp_generation) {
1051                         rc = llog_cat_cancel_records(env, llh, 1,
1052                                                      &jra->jra_lcookie);
1053                         if (rc)
1054                                 CERROR("%s: can't cancel record: %d\n",
1055                                        obd->obd_name, rc);
1056                 } else {
1057                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1058                                   imp->imp_peer_committed_transno);
1059                 }
1060                 ptlrpc_req_finished(req);
1061                 done++;
1062         }
1063
1064         llog_ctxt_put(ctxt);
1065
1066         LASSERT(atomic_read(&d->opd_syn_rpc_in_progress) >= done);
1067         atomic_sub(done, &d->opd_syn_rpc_in_progress);
1068         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1069                d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_flight),
1070                atomic_read(&d->opd_syn_rpc_in_progress));
1071
1072         osp_sync_check_for_work(d);
1073
1074         /* wake up the thread if requested to stop:
1075          * it might be waiting for in-progress to complete */
1076         if (unlikely(osp_sync_running(d) == 0))
1077                 wake_up(&d->opd_syn_waitq);
1078
1079         EXIT;
1080 }
1081
1082 /**
1083  * The core of the syncing mechanism.
1084  *
1085  * This is a callback called by the llog processing function. Essentially it
1086  * suspends llog processing until there is a record to process (it's supposed
1087  * to be committed locally). The function handles RPCs committed by the target
1088  * and cancels corresponding llog records.
1089  *
1090  * \param[in] env       LU environment provided by the caller
1091  * \param[in] llh       llog handle we're processing
1092  * \param[in] rec       current llog record
1093  * \param[in] data      callback data containing a pointer to the device
1094  *
1095  * \retval 0                    to ask the caller (llog_process()) to continue
1096  * \retval LLOG_PROC_BREAK      to ask the caller to break
1097  */
1098 static int osp_sync_process_queues(const struct lu_env *env,
1099                                    struct llog_handle *llh,
1100                                    struct llog_rec_hdr *rec,
1101                                    void *data)
1102 {
1103         struct osp_device       *d = data;
1104
1105         do {
1106                 struct l_wait_info lwi = { 0 };
1107
1108                 if (!osp_sync_running(d)) {
1109                         CDEBUG(D_HA, "stop llog processing\n");
1110                         return LLOG_PROC_BREAK;
1111                 }
1112
1113                 /* process requests committed by OST */
1114                 osp_sync_process_committed(env, d);
1115
1116                 /* if we there are changes to be processed and we have
1117                  * resources for this ... do now */
1118                 if (osp_sync_can_process_new(d, rec)) {
1119                         if (llh == NULL) {
1120                                 /* ask llog for another record */
1121                                 CDEBUG(D_HA, "%u changes, %u in progress,"
1122                                        " %u in flight\n",
1123                                        atomic_read(&d->opd_syn_changes),
1124                                        atomic_read(&d->opd_syn_rpc_in_progress),
1125                                        atomic_read(&d->opd_syn_rpc_in_flight));
1126                                 return 0;
1127                         }
1128                         osp_sync_process_record(env, d, llh, rec);
1129                         llh = NULL;
1130                         rec = NULL;
1131                 }
1132
1133                 if (d->opd_syn_last_processed_id == d->opd_syn_last_used_id)
1134                         osp_sync_remove_from_tracker(d);
1135
1136                 l_wait_event(d->opd_syn_waitq,
1137                              !osp_sync_running(d) ||
1138                              osp_sync_can_process_new(d, rec) ||
1139                              !list_empty(&d->opd_syn_committed_there),
1140                              &lwi);
1141         } while (1);
1142 }
1143
1144 /**
1145  * OSP sync thread.
1146  *
1147  * This thread runs llog_cat_process() scanner calling our callback
1148  * to process llog records. in the callback we implement tricky
1149  * state machine as we don't want to start scanning of the llog again
1150  * and again, also we don't want to process too many records and send
1151  * too many RPCs a time. so, depending on current load (num of changes
1152  * being synced to OST) the callback can suspend awaiting for some
1153  * new conditions, like syncs completed.
1154  *
1155  * In order to process llog records left by previous boots and to allow
1156  * llog_process_thread() to find something (otherwise it'd just exit
1157  * immediately) we add a special GENERATATION record on each boot.
1158  *
1159  * \param[in] _arg      a pointer to thread's arguments
1160  *
1161  * \retval 0            on success
1162  * \retval negative     negated errno on error
1163  */
1164 static int osp_sync_thread(void *_arg)
1165 {
1166         struct osp_device       *d = _arg;
1167         struct ptlrpc_thread    *thread = &d->opd_syn_thread;
1168         struct l_wait_info       lwi = { 0 };
1169         struct llog_ctxt        *ctxt;
1170         struct obd_device       *obd = d->opd_obd;
1171         struct llog_handle      *llh;
1172         struct lu_env            env;
1173         int                      rc, count;
1174
1175         ENTRY;
1176
1177         rc = lu_env_init(&env, LCT_LOCAL);
1178         if (rc) {
1179                 CERROR("%s: can't initialize env: rc = %d\n",
1180                        obd->obd_name, rc);
1181
1182                 spin_lock(&d->opd_syn_lock);
1183                 thread->t_flags = SVC_STOPPED;
1184                 spin_unlock(&d->opd_syn_lock);
1185                 wake_up(&thread->t_ctl_waitq);
1186
1187                 RETURN(rc);
1188         }
1189
1190         spin_lock(&d->opd_syn_lock);
1191         thread->t_flags = SVC_RUNNING;
1192         spin_unlock(&d->opd_syn_lock);
1193         wake_up(&thread->t_ctl_waitq);
1194
1195         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1196         if (ctxt == NULL) {
1197                 CERROR("can't get appropriate context\n");
1198                 GOTO(out, rc = -EINVAL);
1199         }
1200
1201         llh = ctxt->loc_handle;
1202         if (llh == NULL) {
1203                 CERROR("can't get llh\n");
1204                 llog_ctxt_put(ctxt);
1205                 GOTO(out, rc = -EINVAL);
1206         }
1207
1208         rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
1209         if (rc < 0) {
1210                 CERROR("%s: llog process with osp_sync_process_queues "
1211                        "failed: %d\n", d->opd_obd->obd_name, rc);
1212                 GOTO(close, rc);
1213         }
1214         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1215                  "%u changes, %u in progress, %u in flight: %d\n",
1216                  atomic_read(&d->opd_syn_changes),
1217                  atomic_read(&d->opd_syn_rpc_in_progress),
1218                  atomic_read(&d->opd_syn_rpc_in_flight), rc);
1219
1220         /* we don't expect llog_process_thread() to exit till umount */
1221         LASSERTF(thread->t_flags != SVC_RUNNING,
1222                  "%u changes, %u in progress, %u in flight\n",
1223                  atomic_read(&d->opd_syn_changes),
1224                  atomic_read(&d->opd_syn_rpc_in_progress),
1225                  atomic_read(&d->opd_syn_rpc_in_flight));
1226
1227         /* wait till all the requests are completed */
1228         count = 0;
1229         while (atomic_read(&d->opd_syn_rpc_in_progress) > 0) {
1230                 osp_sync_process_committed(&env, d);
1231
1232                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1233                 rc = l_wait_event(d->opd_syn_waitq,
1234                                   atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1235                                   &lwi);
1236                 if (rc == -ETIMEDOUT)
1237                         count++;
1238                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1239                          d->opd_obd->obd_name,
1240                          atomic_read(&d->opd_syn_rpc_in_progress),
1241                          atomic_read(&d->opd_syn_rpc_in_flight),
1242                          list_empty(&d->opd_syn_committed_there) ? "" : "!");
1243
1244         }
1245
1246 close:
1247         llog_cat_close(&env, llh);
1248         rc = llog_cleanup(&env, ctxt);
1249         if (rc)
1250                 CERROR("can't cleanup llog: %d\n", rc);
1251 out:
1252         LASSERTF(atomic_read(&d->opd_syn_rpc_in_progress) == 0,
1253                  "%s: %d %d %sempty\n",
1254                  d->opd_obd->obd_name, atomic_read(&d->opd_syn_rpc_in_progress),
1255                  atomic_read(&d->opd_syn_rpc_in_flight),
1256                  list_empty(&d->opd_syn_committed_there) ? "" : "!");
1257
1258         thread->t_flags = SVC_STOPPED;
1259
1260         wake_up(&thread->t_ctl_waitq);
1261
1262         lu_env_fini(&env);
1263
1264         RETURN(0);
1265 }
1266
1267 /**
1268  * Initialize llog.
1269  *
1270  * Initializes the llog. Specific llog to be used depends on the type of the
1271  * target OSP represents (OST or MDT). The function adds appends a new llog
1272  * record to mark the place where the records associated with this boot
1273  * start.
1274  *
1275  * \param[in] env       LU environment provided by the caller
1276  * \param[in] d         OSP device
1277  *
1278  * \retval 0            on success
1279  * \retval negative     negated errno on error
1280  */
1281 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1282 {
1283         struct osp_thread_info  *osi = osp_env_info(env);
1284         struct lu_fid           *fid = &osi->osi_fid;
1285         struct llog_handle      *lgh = NULL;
1286         struct obd_device       *obd = d->opd_obd;
1287         struct llog_ctxt        *ctxt;
1288         int                     rc;
1289
1290         ENTRY;
1291
1292         LASSERT(obd);
1293
1294         /*
1295          * open llog corresponding to our OST
1296          */
1297         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1298         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1299
1300         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1301
1302         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1303                                    &osi->osi_cid, fid);
1304         if (rc < 0) {
1305                 if (rc != -EFAULT) {
1306                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1307                                obd->obd_name, rc);
1308                         RETURN(rc);
1309                 }
1310
1311                 /* After sparse OST indices is supported, the CATALOG file
1312                  * may become a sparse file that results in failure on
1313                  * reading. Skip this error as the llog will be created
1314                  * later */
1315                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1316                 rc = 0;
1317         }
1318
1319         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DFID":%x\n",
1320                obd->obd_name, d->opd_index,
1321                PFID(&osi->osi_cid.lci_logid.lgl_oi.oi_fid),
1322                osi->osi_cid.lci_logid.lgl_ogen);
1323
1324         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1325                         d->opd_storage->dd_lu_dev.ld_obd,
1326                         &osp_mds_ost_orig_logops);
1327         if (rc)
1328                 RETURN(rc);
1329
1330         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1331         LASSERT(ctxt);
1332
1333         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1334                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1335                                LLOG_OPEN_EXISTS);
1336                 /* re-create llog if it is missing */
1337                 if (rc == -ENOENT)
1338                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1339                 else if (rc < 0)
1340                         GOTO(out_cleanup, rc);
1341         }
1342
1343         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1344                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1345                 if (rc < 0)
1346                         GOTO(out_cleanup, rc);
1347                 osi->osi_cid.lci_logid = lgh->lgh_id;
1348         }
1349
1350         LASSERT(lgh != NULL);
1351         ctxt->loc_handle = lgh;
1352
1353         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1354         if (rc)
1355                 GOTO(out_close, rc);
1356
1357         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1358                                    &osi->osi_cid, fid);
1359         if (rc)
1360                 GOTO(out_close, rc);
1361
1362         /*
1363          * put a mark in the llog till which we'll be processing
1364          * old records restless
1365          */
1366         d->opd_syn_generation.mnt_cnt = cfs_time_current();
1367         d->opd_syn_generation.conn_cnt = cfs_time_current();
1368
1369         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1370         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1371
1372         memcpy(&osi->osi_gen.lgr_gen, &d->opd_syn_generation,
1373                sizeof(osi->osi_gen.lgr_gen));
1374
1375         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1376         if (rc < 0)
1377                 GOTO(out_close, rc);
1378         llog_ctxt_put(ctxt);
1379         RETURN(0);
1380 out_close:
1381         llog_cat_close(env, lgh);
1382 out_cleanup:
1383         llog_cleanup(env, ctxt);
1384         RETURN(rc);
1385 }
1386
1387 /**
1388  * Cleanup llog used for syncing.
1389  *
1390  * Closes and cleanups the llog. The function is called when the device is
1391  * shutting down.
1392  *
1393  * \param[in] env       LU environment provided by the caller
1394  * \param[in] d         OSP device
1395  */
1396 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1397 {
1398         struct llog_ctxt *ctxt;
1399
1400         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1401         if (ctxt) {
1402                 llog_cat_close(env, ctxt->loc_handle);
1403                 llog_cleanup(env, ctxt);
1404         }
1405 }
1406
1407 /**
1408  * Initialization of the sync component of OSP.
1409  *
1410  * Initializes the llog and starts a new thread to handle the changes to
1411  * the remote target (OST or MDT).
1412  *
1413  * \param[in] env       LU environment provided by the caller
1414  * \param[in] d         OSP device
1415  *
1416  * \retval 0            on success
1417  * \retval negative     negated errno on error
1418  */
1419 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1420 {
1421         struct l_wait_info       lwi = { 0 };
1422         struct task_struct      *task;
1423         int                      rc;
1424
1425         ENTRY;
1426
1427         d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
1428         d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
1429         spin_lock_init(&d->opd_syn_lock);
1430         init_waitqueue_head(&d->opd_syn_waitq);
1431         init_waitqueue_head(&d->opd_syn_barrier_waitq);
1432         thread_set_flags(&d->opd_syn_thread, SVC_INIT);
1433         init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
1434         INIT_LIST_HEAD(&d->opd_syn_inflight_list);
1435         INIT_LIST_HEAD(&d->opd_syn_committed_there);
1436
1437         if (d->opd_storage->dd_rdonly)
1438                 RETURN(0);
1439
1440         rc = osp_sync_id_traction_init(d);
1441         if (rc)
1442                 RETURN(rc);
1443
1444         /*
1445          * initialize llog storing changes
1446          */
1447         rc = osp_sync_llog_init(env, d);
1448         if (rc) {
1449                 CERROR("%s: can't initialize llog: rc = %d\n",
1450                        d->opd_obd->obd_name, rc);
1451                 GOTO(err_id, rc);
1452         }
1453
1454         /*
1455          * Start synchronization thread
1456          */
1457         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1458                            d->opd_index, d->opd_group);
1459         if (IS_ERR(task)) {
1460                 rc = PTR_ERR(task);
1461                 CERROR("%s: cannot start sync thread: rc = %d\n",
1462                        d->opd_obd->obd_name, rc);
1463                 GOTO(err_llog, rc);
1464         }
1465
1466         l_wait_event(d->opd_syn_thread.t_ctl_waitq,
1467                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1468
1469         RETURN(0);
1470 err_llog:
1471         osp_sync_llog_fini(env, d);
1472 err_id:
1473         osp_sync_id_traction_fini(d);
1474         return rc;
1475 }
1476
1477 /**
1478  * Stop the syncing thread.
1479  *
1480  * Asks the syncing thread to stop and wait until it's stopped.
1481  *
1482  * \param[in] d         OSP device
1483  *
1484  * \retval              0
1485  */
1486 int osp_sync_fini(struct osp_device *d)
1487 {
1488         struct ptlrpc_thread *thread = &d->opd_syn_thread;
1489
1490         ENTRY;
1491
1492         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1493                 thread->t_flags = SVC_STOPPING;
1494                 wake_up(&d->opd_syn_waitq);
1495                 wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
1496         }
1497
1498         /*
1499          * unregister transaction callbacks only when sync thread
1500          * has finished operations with llog
1501          */
1502         osp_sync_id_traction_fini(d);
1503
1504         RETURN(0);
1505 }
1506
1507 static DEFINE_MUTEX(osp_id_tracker_sem);
1508 static struct list_head osp_id_tracker_list =
1509                 LIST_HEAD_INIT(osp_id_tracker_list);
1510
1511 /**
1512  * OSD commit callback.
1513  *
1514  * The function is used as a local OSD commit callback to track the highest
1515  * committed llog record id. see osp_sync_id_traction_init() for the details.
1516  *
1517  * \param[in] th        local transaction handle committed
1518  * \param[in] cookie    commit callback data (our private structure)
1519  */
1520 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1521 {
1522         struct osp_id_tracker   *tr = cookie;
1523         struct osp_device       *d;
1524         struct osp_txn_info     *txn;
1525
1526         LASSERT(tr);
1527
1528         txn = osp_txn_info(&th->th_ctx);
1529         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1530                 return;
1531
1532         spin_lock(&tr->otr_lock);
1533         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1534                 CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
1535                        tr->otr_committed_id, txn->oti_current_id);
1536                 tr->otr_committed_id = txn->oti_current_id;
1537
1538                 list_for_each_entry(d, &tr->otr_wakeup_list,
1539                                     opd_syn_ontrack) {
1540                         d->opd_syn_last_committed_id = tr->otr_committed_id;
1541                         wake_up(&d->opd_syn_waitq);
1542                 }
1543         }
1544         spin_unlock(&tr->otr_lock);
1545 }
1546
1547 /**
1548  * Initialize commit tracking mechanism.
1549  *
1550  * Some setups may have thousands of OSTs and each will be represented by OSP.
1551  * Meaning order of magnitute many more changes to apply every second. In order
1552  * to keep the number of commit callbacks low this mechanism was introduced.
1553  * The mechanism is very similar to transno used by MDT service: it's an single
1554  * ID stream which can be assigned by any OSP to its llog records. The tricky
1555  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1556  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1557  *
1558  * The function initializes the data used by the tracker described above.
1559  * A singler tracker per OSD device is created.
1560  *
1561  * \param[in] d         OSP device
1562  *
1563  * \retval 0            on success
1564  * \retval negative     negated errno on error
1565  */
1566 static int osp_sync_id_traction_init(struct osp_device *d)
1567 {
1568         struct osp_id_tracker   *tr, *found = NULL;
1569         int                      rc = 0;
1570
1571         LASSERT(d);
1572         LASSERT(d->opd_storage);
1573         LASSERT(d->opd_syn_tracker == NULL);
1574         INIT_LIST_HEAD(&d->opd_syn_ontrack);
1575
1576         mutex_lock(&osp_id_tracker_sem);
1577         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1578                 if (tr->otr_dev == d->opd_storage) {
1579                         LASSERT(atomic_read(&tr->otr_refcount));
1580                         atomic_inc(&tr->otr_refcount);
1581                         d->opd_syn_tracker = tr;
1582                         found = tr;
1583                         break;
1584                 }
1585         }
1586
1587         if (found == NULL) {
1588                 rc = -ENOMEM;
1589                 OBD_ALLOC_PTR(tr);
1590                 if (tr) {
1591                         d->opd_syn_tracker = tr;
1592                         spin_lock_init(&tr->otr_lock);
1593                         tr->otr_dev = d->opd_storage;
1594                         tr->otr_next_id = 1;
1595                         tr->otr_committed_id = 0;
1596                         atomic_set(&tr->otr_refcount, 1);
1597                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1598                         list_add(&tr->otr_list, &osp_id_tracker_list);
1599                         tr->otr_tx_cb.dtc_txn_commit =
1600                                                 osp_sync_tracker_commit_cb;
1601                         tr->otr_tx_cb.dtc_cookie = tr;
1602                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1603                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1604                         rc = 0;
1605                 }
1606         }
1607         mutex_unlock(&osp_id_tracker_sem);
1608
1609         return rc;
1610 }
1611
1612 /**
1613  * Release commit tracker.
1614  *
1615  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1616  * If no more users left, then the tracker is released.
1617  *
1618  * \param[in] d         OSP device
1619  */
1620 static void osp_sync_id_traction_fini(struct osp_device *d)
1621 {
1622         struct osp_id_tracker *tr;
1623
1624         ENTRY;
1625
1626         LASSERT(d);
1627         tr = d->opd_syn_tracker;
1628         if (tr == NULL) {
1629                 EXIT;
1630                 return;
1631         }
1632
1633         osp_sync_remove_from_tracker(d);
1634
1635         mutex_lock(&osp_id_tracker_sem);
1636         if (atomic_dec_and_test(&tr->otr_refcount)) {
1637                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1638                 LASSERT(list_empty(&tr->otr_wakeup_list));
1639                 list_del(&tr->otr_list);
1640                 OBD_FREE_PTR(tr);
1641                 d->opd_syn_tracker = NULL;
1642         }
1643         mutex_unlock(&osp_id_tracker_sem);
1644
1645         EXIT;
1646 }
1647
1648 /**
1649  * Generate a new ID on a tracker.
1650  *
1651  * Generates a new ID using the tracker associated with the given OSP device
1652  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1653  * the wakeup list, so OSP won't miss when a transaction using the ID is
1654  * committed.
1655  *
1656  * \param[in] d         OSP device
1657  * \param[in] id        0 or ID generated previously
1658  *
1659  * \retval              ID the caller should use
1660  */
1661 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
1662 {
1663         struct osp_id_tracker *tr;
1664
1665         tr = d->opd_syn_tracker;
1666         LASSERT(tr);
1667
1668         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1669         spin_lock(&tr->otr_lock);
1670         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
1671                 tr->otr_next_id = 0xfffffff0;
1672
1673         if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
1674                 spin_unlock(&tr->otr_lock);
1675                 CERROR("%s: next %llu, last synced %llu\n",
1676                        d->opd_obd->obd_name, tr->otr_next_id,
1677                        d->opd_syn_last_used_id);
1678                 LBUG();
1679         }
1680
1681         if (id == 0)
1682                 id = tr->otr_next_id++;
1683         if (id > d->opd_syn_last_used_id)
1684                 d->opd_syn_last_used_id = id;
1685         if (list_empty(&d->opd_syn_ontrack))
1686                 list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
1687         spin_unlock(&tr->otr_lock);
1688         CDEBUG(D_OTHER, "new id %llu\n", id);
1689
1690         return id;
1691 }
1692
1693 /**
1694  * Stop to propagate commit status to OSP.
1695  *
1696  * If the OSP does not have any llog records she's waiting to commit, then
1697  * it is possible to unsubscribe from wakeups from the tracking using this
1698  * method.
1699  *
1700  * \param[in] d         OSP device not willing to wakeup
1701  */
1702 static void osp_sync_remove_from_tracker(struct osp_device *d)
1703 {
1704         struct osp_id_tracker *tr;
1705
1706         tr = d->opd_syn_tracker;
1707         LASSERT(tr);
1708
1709         if (list_empty(&d->opd_syn_ontrack))
1710                 return;
1711
1712         spin_lock(&tr->otr_lock);
1713         list_del_init(&d->opd_syn_ontrack);
1714         spin_unlock(&tr->otr_lock);
1715 }
1716