11/* eslint max-nested-callbacks: ["error", 8] */
22'use strict'
33
4- const { DAGNode, DAGLink, util } = require ( 'ipld-dag-pb' )
4+ const { DAGNode, DAGLink } = require ( 'ipld-dag-pb' )
55const CID = require ( 'cids' )
6- const map = require ( 'async/map' )
76const series = require ( 'async/series' )
87const parallel = require ( 'async/parallel' )
98const eachLimit = require ( 'async/eachLimit' )
109const waterfall = require ( 'async/waterfall' )
1110const detectLimit = require ( 'async/detectLimit' )
11+ const queue = require ( 'async/queue' )
1212const { Key } = require ( 'interface-datastore' )
1313const errCode = require ( 'err-code' )
1414const multicodec = require ( 'multicodec' )
@@ -43,6 +43,34 @@ class PinManager {
4343 this . recursivePins = new Set ( )
4444 }
4545
46+ _walkDag ( { cid, preload = false , onCid = ( ) => { } } , cb ) {
47+ const q = queue ( ( { cid } , done ) => {
48+ this . dag . get ( cid , { preload } , ( err , result ) => {
49+ if ( err ) {
50+ return done ( err )
51+ }
52+
53+ onCid ( cid )
54+
55+ if ( result . value . Links ) {
56+ q . push ( result . value . Links . map ( link => ( {
57+ cid : link . Hash
58+ } ) ) )
59+ }
60+
61+ done ( )
62+ } )
63+ } , concurrencyLimit )
64+ q . drain = ( ) => {
65+ cb ( )
66+ }
67+ q . error = ( err ) => {
68+ q . kill ( )
69+ cb ( err )
70+ }
71+ q . push ( { cid } )
72+ }
73+
4674 directKeys ( ) {
4775 return Array . from ( this . directPins , key => new CID ( key ) . buffer )
4876 }
@@ -51,30 +79,21 @@ class PinManager {
5179 return Array . from ( this . recursivePins , key => new CID ( key ) . buffer )
5280 }
5381
54- getIndirectKeys ( callback ) {
82+ getIndirectKeys ( { preload } , callback ) {
5583 const indirectKeys = new Set ( )
5684 eachLimit ( this . recursiveKeys ( ) , concurrencyLimit , ( multihash , cb ) => {
57- this . dag . _getRecursive ( multihash , ( err , nodes ) => {
58- if ( err ) {
59- return cb ( err )
60- }
61-
62- map ( nodes , ( node , cb ) => util . cid ( util . serialize ( node ) , {
63- cidVersion : 0
64- } ) . then ( cid => cb ( null , cid ) , cb ) , ( err , cids ) => {
65- if ( err ) {
66- return cb ( err )
85+ this . _walkDag ( {
86+ cid : new CID ( multihash ) ,
87+ preload : preload || false ,
88+ onCid : ( cid ) => {
89+ cid = cid . toString ( )
90+
91+ // recursive pins pre-empt indirect pins
92+ if ( ! this . recursivePins . has ( cid ) ) {
93+ indirectKeys . add ( cid )
6794 }
68-
69- cids
70- . map ( cid => cid . toString ( ) )
71- // recursive pins pre-empt indirect pins
72- . filter ( key => ! this . recursivePins . has ( key ) )
73- . forEach ( key => indirectKeys . add ( key ) )
74-
75- cb ( )
76- } )
77- } )
95+ }
96+ } , cb )
7897 } , ( err ) => {
7998 if ( err ) { return callback ( err ) }
8099 callback ( null , Array . from ( indirectKeys ) )
@@ -283,6 +302,13 @@ class PinManager {
283302 } )
284303 }
285304
305+ fetchCompleteDag ( cid , options , callback ) {
306+ this . _walkDag ( {
307+ cid,
308+ preload : options . preload
309+ } , callback )
310+ }
311+
286312 // Returns an error if the pin type is invalid
287313 static checkPinType ( type ) {
288314 if ( typeof type !== 'string' || ! Object . keys ( PinTypes ) . includes ( type ) ) {
0 commit comments