Hub component

The purpose of the BioThings hub component is to allow you to easily automate the parsing and uploading of your data to an Elasticsearch backend.

dumper

BaseDumper

class biothings.dataload.dumper.BaseDumper(src_name=None, src_root_folder=None, log_folder=None, no_confirm=True, archive=None)[source]
create_todump_list(force=False, **kwargs)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

download(remotefile, localfile)[source]

Download “remotefile’ to local location defined by ‘localfile’ Return relevant information about remotefile (depends on the actual client)

dump(steps=None, force=False, job_manager=None, **kwargs)[source]

Dump (ie. download) resource as needed this should be called after instance creation ‘force’ argument will force dump, passing this to create_todump_list() method.

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

need_prepare()[source]

check whether some prepare step should executed before running dump

new_data_folder

Generate a new data folder path using src_root_folder and specified suffix attribute. Also sync current (aka previous) data folder previously registeted in database. This method typically has to be called in create_todump_list() when the dumper actually knows some information about the resource, like the actual release.

post_download(remotefile, localfile)[source]

Placeholder to add a custom process once a file is downloaded. This is a good place to check file’s integrity. Optional

post_dump()[source]

Placeholder to add a custom process once the whole resource has been dumped. Optional.

prepare_client()[source]

do initialization to make the client ready to dump files

release_client()[source]

Do whatever necessary (like closing network connection) to “release” the client

remote_is_better(remotefile, localfile)[source]

Compared to local file, check if remote file is worth downloading. (like either bigger or newer for instance)

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

FTPDumper

class biothings.dataload.dumper.FTPDumper(src_name=None, src_root_folder=None, log_folder=None, no_confirm=True, archive=None)[source]
remote_is_better(remotefile, localfile)[source]

‘remotefile’ is relative path from current working dir (CWD_DIR), ‘localfile’ is absolute path

HTTPDumper

class biothings.dataload.dumper.HTTPDumper(src_name=None, src_root_folder=None, log_folder=None, no_confirm=True, archive=None)[source]

Dumper using HTTP protocol and “requests” library

download(remoteurl, localfile, headers={})[source]

kwargs will be passed to requests.Session.get()

GoogleDriveDumper
class biothings.dataload.dumper.GoogleDriveDumper(src_name=None, src_root_folder=None, log_folder=None, no_confirm=True, archive=None)[source]
download(remoteurl, localfile)[source]
remoteurl is a google drive link containing a document ID, such as:

It can also be just a document ID

WgetDumper

class biothings.dataload.dumper.WgetDumper(src_name=None, src_root_folder=None, log_folder=None, no_confirm=True, archive=None)[source]
create_todump_list(force=False, **kwargs)[source]

Fill self.to_dump list with dict(“remote”:remote_path,”local”:local_path) elements. This is the todo list for the dumper. It’s a good place to check whether needs to be downloaded. If ‘force’ is True though, all files will be considered for download

prepare_client()[source]

Check if ‘wget’ executable exists

DummyDumper

class biothings.dataload.dumper.DummyDumper(*args, **kwargs)[source]

DummyDumper will do nothing... (useful for datasources that can’t be downloaded anymore but still need to be integrated, ie. fill src_dump, etc...)

ManualDumper

class biothings.dataload.dumper.ManualDumper(*args, **kwargs)[source]

This dumper will assist user to dump a resource. It will usually expect the files to be downloaded first (sometimes there’s no easy way to automate this process). Once downloaded, a call to dump() will make sure everything is fine in terms of files and metadata

uploader

BaseSourceUploader

class biothings.dataload.uploader.BaseSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

Default datasource uploader. Database storage can be done in batch or line by line. Duplicated records aren’t not allowed

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

classmethod create(klass, db_conn_info, data_root, *args, **kwargs)[source]

Factory-like method, just return an instance of this uploader (used by SourceManager, may be overridden in sub-class to generate more than one instance per class, like a true factory. This is usefull when a resource is splitted in different collection but the data structure doesn’t change (it’s really just data splitted accros multiple collections, usually for parallelization purposes). Instead of having actual class for each split collection, factory will generate them on-the-fly.

classmethod get_mapping()[source]

Return ES mapping

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

load(steps=['data', 'post', 'master', 'clean'], force=False, batch_size=10000, job_manager=None, **kwargs)[source]

Main resource load process, reads data from doc_c using chunk sized as batch_size. steps defines the different processes used to laod the resource: - “data” : will store actual data into single collections - “post” : will perform post data load operations - “master” : will register the master document in src_master

load_data(data_folder)[source]

Parse data inside data_folder and return structure ready to be inserted in database

make_temp_collection()[source]

Create a temp collection for dataloading, e.g., entrez_geneinfo_INEMO.

post_update_data(steps, force, batch_size, job_manager, **kwargs)[source]

Override as needed to perform operations after data has been uploaded

prepare(state={})[source]

Sync uploader information with database (or given state dict)

prepare_src_dump()[source]

Sync with src_dump collection, collection information (src_doc) Return src_dump collection

register_status(status, **extra)[source]

Register step status, ie. status for a sub-resource

setup_log()[source]

Setup and return a logger instance

switch_collection()[source]

after a successful loading, rename temp_collection to regular collection name, and renaming existing collection to a temp name for archiving purpose.

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

update_data(batch_size, job_manager)[source]

Iterate over load_data() to pull data and store it

NoBatchIgnoreDuplicatedSourceUploader

class biothings.dataload.uploader.NoBatchIgnoreDuplicatedSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution...). Storage is done line by line (slow, not using a batch) but preserve order of data in input file.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

storage_class

alias of NoBatchIgnoreDuplicatedStorage

IgnoreDuplicatedSourceUploader

class biothings.dataload.uploader.IgnoreDuplicatedSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

Same as default uploader, but will store records and ignore if any duplicated error occuring (use with caution...). Storage is done using batch and unordered bulk operations.

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

MergerSourceUploader

class biothings.dataload.uploader.MergerSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

storage_class

alias of MergerStorage

DummySourceUploader

class biothings.dataload.uploader.DummySourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

Dummy uploader, won’t upload any data, assuming data is already there but make sure every other bit of information is there for the overall process (usefull when online data isn’t available anymore)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

ParallelizedSourceUploader

class biothings.dataload.uploader.ParallelizedSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

jobs()[source]

Return list of (*arguments) passed to self.load_data, in order. for each parallelized jobs. Ex: [(x,1),(y,2),(z,3)] If only one argument is required, it still must be passed as a 1-element tuple

NoDataSourceUploader

class biothings.dataload.uploader.NoDataSourceUploader(db_conn_info, data_root, collection_name=None, log_folder=None, *args, **kwargs)[source]

This uploader won’t upload any data and won’t even assume there’s actual data (different from DummySourceUploader on this point). It’s usefull for instance when mapping need to be stored (get_mapping()) but data doesn’t comes from an actual upload (ie. generated)

db_conn_info is a database connection info tuple (host,port) to fetch/store information about the datasource’s state data_root is the root folder containing all resources. It will generate its own data folder from this point

storage_class

alias of NoStorage

builder

DataBuilder

class biothings.databuild.builder.DataBuilder(build_name, source_backend, target_backend, log_folder, doc_root_key='root', mappers=[], default_mapper_class=<class 'biothings.databuild.mapper.TransparentMapper'>, sources=None, target_name=None, **kwargs)[source]
get_build_version()[source]

Generate an arbitrary major build version. Default is using a timestamp (YYMMDD) ‘.’ char isn’t allowed in build version as it’s reserved for minor versions

get_mapping(sources)[source]

Merge mappings from src_master

get_metadata(sources, job_manager)[source]

Return a dictionnary of metadata for this build. It’s usually app-specific and this method may be overridden as needed. Default: return “merge_stats” (#docs involved in each single collections used in that build)

Return dictionary will potentially be merged with existing metadata in src_build collection. This behavior can be changed by setting a special key within metadata dict: {“__REPLACE__” : True} will... replace existing metadata with the one returned here.

“job_manager” is passed in case parallelization is needed. Be aware that this method is already running in a dedicated thread, in order to use job_manager, the following code must be used at the very beginning of its implementation: asyncio.set_event_loop(job_manager.loop)

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

merge_sources(source_names, steps=['merge', 'post'], batch_size=100000, ids=None, job_manager=None)[source]

Merge resources from given source_names or from build config. Identify root document sources from the list to first process them. ids can a be list of documents to be merged in particular.

register_status(status, transient=False, init=False, **extra)[source]

Register current build status. A build status is a record in src_build The key used in this dict the target_name. Then, any operation acting on this target_name is registered in a “jobs” list.

resolve_sources(sources)[source]

Source can be a string that may contain regex chars. It’s usefull when you have plenty of sub-collections prefixed with a source name. For instance, given a source named ‘blah’ stored in as many collections as chromosomes, insteand of passing each name as ‘blah_1’, ‘blah_2’, etc... ‘blah_.*’ can be specified in build_config. This method resolves potential regexed source name into real, existing collection names

unprepare()[source]

reset anything that’s not pickable (so self can be pickled) return what’s been reset as a dict, so self can be restored once pickled

indexer

Indexer

class biothings.dataindex.indexer.Indexer(es_host, target_name=None)[source]
get_index_creation_settings()[source]

Override to return a dict containing some extra settings for index creation. Dict will be merged with mandatory settings, see biothings.utils.es.ESIndexer.create_index for more.

get_mapping(enable_timestamp=True)[source]

collect mapping data from data sources. This is for GeneDocESBackend only.

get_pinfo()[source]

Return dict containing information about the current process (used to report in the hub)

index(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode='index')[source]

Build an index named “index_name” with data from collection “target_collection”. “ids” can be passed to selectively index documents. “mode” can have the following values: - ‘purge’: will delete index if it exists - ‘resume’: will use existing index and add documents. “ids” can be passed as a list of missing IDs,

or, if not pass, ES will be queried to identify which IDs are missing for each batch in order to complete the index.
  • None (default): will create a new index, assuming it doesn’t already exist
load_build(target_name=None)[source]

Load build info from src_build collection.

post_index(target_name, index_name, job_manager, steps=['index', 'post'], batch_size=10000, ids=None, mode=None)[source]

Override in sub-class to add a post-index process. Method’s signature is the same as index() to get the full context. This method will run in a thread (using job_manager.defer_to_thread())

differ

BaseDiffer

class biothings.databuild.differ.BaseDiffer(diff_func, job_manager, log_folder)[source]
diff(old_db_col_names, new_db_col_names, batch_size=100000, steps=['content', 'mapping'], mode=None, exclude=[])[source]

wrapper over diff_cols() coroutine, return a task

diff_cols(old_db_col_names, new_db_col_names, batch_size=100000, steps=['count', 'content', 'mapping'], mode=None, exclude=[])[source]

Compare new with old collections and produce diff files. Root keys can be excluded from comparison with “exclude” parameter. *_db_col_names can be:

  1. a colleciton name (as a string) asusming they are in the target database.
  2. tuple with 2 elements, the first one is then either “source” or “target” to respectively specify src or target database, and the second element is the collection name.
  3. tuple with 3 elements (URI,db,collection), looking like: (“mongodb://user:pass@host”,”dbname”,”collection”), allowing to specify any connection on any server
steps: ‘count’ will count the root keys for every documents in new collection
(to check number of docs from datasources). ‘content’ will perform diff on actual content. ‘mapping’ will perform diff on ES mappings (if target collection involved)

mode: ‘purge’ will remove any existing files for this comparison.

JsonDiffer

class biothings.databuild.differ.JsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]
SelfContainedJsonDiffer
class biothings.databuild.differ.SelfContainedJsonDiffer(diff_func=<function diff_docs_jsonpatch>, *args, **kwargs)[source]

DiffReportRendererBase

class biothings.databuild.differ.DiffReportRendererBase(max_reported_ids=None, max_randomly_picked=None, detailed=False)[source]
save(report, filename)[source]

Save report output (rendered) into filename

DiffReportTxt

class biothings.databuild.differ.DiffReportTxt(max_reported_ids=None, max_randomly_picked=None, detailed=False)[source]

syncer

BaseSyncer

class biothings.databuild.syncer.BaseSyncer(job_manager, log_folder)[source]
sync(diff_folder=None, batch_size=10000, mode=None, target_backend=None, steps=['mapping', 'content', 'meta'])[source]

wrapper over sync_cols() coroutine, return a task

sync_cols(diff_folder, batch_size=10000, mode=None, force=False, target_backend=None, steps=['mapping', 'content', 'meta'])[source]

Sync a collection with diff files located in diff_folder. This folder contains a metadata.json file which describes the different involved collection: “old” is the collection/index to be synced, “new” is the collecion that should be obtained once all diff files are applied (not used, just informative). If target_backend (bt.databbuild.backend.create_backend() notation), then it will replace “old” (that is, the one being synced)

MongoJsonDiffSyncer

class biothings.databuild.syncer.MongoJsonDiffSyncer(job_manager, log_folder)[source]

MongoJsonDiffSelfContainedSyncer

class biothings.databuild.syncer.MongoJsonDiffSelfContainedSyncer(job_manager, log_folder)[source]

ESJsonDiffSyncer

class biothings.databuild.syncer.ESJsonDiffSyncer(job_manager, log_folder)[source]

ESJsonDiffSelfContainedSyncer

class biothings.databuild.syncer.ESJsonDiffSelfContainedSyncer(job_manager, log_folder)[source]