可在阿里云dataworks直接使用的worldbank data api
代码下载:
- """
- wbdata: A wrapper for the World Bank API
- """
- __version__ ="0.3.0"
- """
- wbdata.api: Where all the functions go
- """
- import collections
- import datetime
- import re
- import warnings
- import tabulate
- try:
- import pandas as pd
- exceptImportError:
- pd =None
- from decorator import decorator
- BASE_URL ="https://api.worldbank.org/v2"
- COUNTRIES_URL = f"{BASE_URL}/countries"
- ILEVEL_URL = f"{BASE_URL}/incomeLevels"
- INDICATOR_URL = f"{BASE_URL}/indicators"
- LTYPE_URL = f"{BASE_URL}/lendingTypes"
- SOURCES_URL = f"{BASE_URL}/sources"
- TOPIC_URL = f"{BASE_URL}/topics"
- INDIC_ERROR ="Cannot specify more than one of indicator, source, and topic"
- classWBSearchResult(list):
- """
- A list that prints out a user-friendly table when printed or returned on the
- command line
- Items are expected to be dict-like and have an "id" key and a "name" or
- "value" key
- """
- def __repr__(self):
- try:
- return tabulate.tabulate(
- [[o["id"], o["name"]]for o in self],
- headers=["id","name"],
- tablefmt="simple",
- )
- exceptKeyError:
- return tabulate.tabulate(
- [[o["id"], o["value"]]for o in self],
- headers=["id","value"],
- tablefmt="simple",
- )
- if pd:
- classWBSeries(pd.Series):
- """
- A pandas Series with a last_updated attribute
- """
- _metadata =["last_updated"]
- @property
- def _constructor(self):
- returnWBSeries
- classWBDataFrame(pd.DataFrame):
- """
- A pandas DataFrame with a last_updated attribute
- """
- _metadata =["last_updated"]
- @property
- def _constructor(self):
- returnWBDataFrame
- @decorator
- def uses_pandas(f,*args,**kwargs):
- """Raise ValueError if pandas is not loaded"""
- ifnot pd:
- raiseValueError("Pandas must be installed to be used")
- return f(*args,**kwargs)
- def parse_value_or_iterable(arg):
- """
- If arg is a single value, return it as a string; if an iterable, return a
- ;-joined string of all values
- """
- if str(arg)== arg:
- return arg
- if type(arg)== int:
- return str(arg)
- return";".join(arg)
- def convert_year_to_datetime(yearstr):
- """return datetime.datetime object from %Y formatted string"""
- return datetime.datetime.strptime(yearstr,"%Y")
- def convert_month_to_datetime(monthstr):
- """return datetime.datetime object from %YM%m formatted string"""
- split = monthstr.split("M")
- return datetime.datetime(int(split[0]), int(split[1]),1)
- def convert_quarter_to_datetime(quarterstr):
- """
- return datetime.datetime object from %YQ%# formatted string, where # is
- the desired quarter
- """
- split = quarterstr.split("Q")
- quarter = int(split[1])
- month = quarter *3-2
- return datetime.datetime(int(split[0]), month,1)
- def convert_dates_to_datetime(data):
- """
- Return a datetime.datetime object from a date string as provided by the
- World Bank
- """
- first = data[0]["date"]
- if isinstance(first, datetime.datetime):
- return data
- if"M"in first:
- converter = convert_month_to_datetime
- elif"Q"in first:
- converter = convert_quarter_to_datetime
- else:
- converter = convert_year_to_datetime
- for datum in data:
- datum_date = datum["date"]
- if"MRV"in datum_date:
- continue
- if"-"in datum_date:
- continue
- datum["date"]= converter(datum_date)
- return data
- def cast_float(value):
- """
- Return a floated value or none
- """
- try:
- return float(value)
- except(ValueError,TypeError):
- returnNone
- def get_series(
- indicator,
- country="all",
- data_date=None,
- freq="Y",
- source=None,
- convert_date=False,
- column_name="value",
- keep_levels=False,
- cache=True,
- ):
- """
- Retrieve indicators for given countries and years
- :indicator: the desired indicator code
- :country: a country code, sequence of country codes, or "all" (default)
- :data_date: the desired date as a datetime object or a 2-tuple with start
- and end dates
- :freq: the desired periodicity of the data, one of 'Y' (yearly), 'M'
- (monthly), or 'Q' (quarterly). The indicator may or may not support the
- specified frequency.
- :source: the specific source to retrieve data from (defaults on API to 2,
- World Development Indicators)
- :convert_date: if True, convert date field to a datetime.datetime object.
- :column_name: the desired name for the pandas column
- :keep_levels: if True and pandas is True, don't reduce the number of index
- levels returned if only getting one date or country
- :cache: use the cache
- :returns: WBSeries
- """
- raw_data = get_data(
- indicator=indicator,
- country=country,
- data_date=data_date,
- freq=freq,
- source=source,
- convert_date=convert_date,
- cache=cache,
- )
- df = pd.DataFrame(
- [[i["country"]["value"], i["date"], i["value"]]for i in raw_data],
- columns=["country","date", column_name],
- )
- df[column_name]= df[column_name].map(cast_float)
- ifnot keep_levels and len(df["country"].unique())==1:
- df = df.set_index("date")
- elifnot keep_levels and len(df["date"].unique())==1:
- df = df.set_index("country")
- else:
- df = df.set_index(["country","date"])
- series =WBSeries(df[column_name])
- series.last_updated = raw_data.last_updated
- return series
- def data_date_to_str(data_date, freq):
- """
- Convert data_date to the appropriate representation base on freq
- :data_date: A datetime.datetime object to be formatted
- :freq: One of 'Y' (year), 'M' (month) or 'Q' (quarter)
- """
- if freq =="Y":
- return data_date.strftime("%Y")
- if freq =="M":
- return data_date.strftime("%YM%m")
- if freq =="Q":
- return f"{data_date.year}Q{(data_date.month - 1) // 3 + 1}"
- def get_data(
- indicator,
- country="all",
- data_date=None,
- freq="Y",
- source=None,
- convert_date=False,
- pandas=False,
- column_name="value",
- keep_levels=False,
- cache=True,
- ):
- """
- Retrieve indicators for given countries and years
- :indicator: the desired indicator code
- :country: a country code, sequence of country codes, or "all" (default)
- :data_date: the desired date as a datetime object or a 2-tuple with start
- and end dates
- :freq: the desired periodicity of the data, one of 'Y' (yearly), 'M'
- (monthly), or 'Q' (quarterly). The indicator may or may not support the
- specified frequency.
- :source: the specific source to retrieve data from (defaults on API to 2,
- World Development Indicators)
- :convert_date: if True, convert date field to a datetime.datetime object.
- :cache: use the cache
- :returns: list of dictionaries
- """
- if pandas:
- warnings.warn(
- (
- "Argument 'pandas' is deprecated and will be removed in a "
- "future version. Use get_series or get_dataframe instead."
- ),
- PendingDeprecationWarning,
- )
- return get_series(
- indicator=indicator,
- country=country,
- data_date=data_date,
- source=source,
- convert_date=convert_date,
- column_name=column_name,
- keep_levels=keep_levels,
- cache=cache,
- )
- query_url = COUNTRIES_URL
- try:
- c_part = parse_value_or_iterable(country)
- exceptTypeError:
- raiseTypeError("'country' must be a string or iterable'")
- query_url ="/".join((query_url, c_part,"indicators", indicator))
- args ={}
- if data_date:
- args["date"]=(
- ":".join(data_date_to_str(dd, freq)for dd in data_date)
- if isinstance(data_date, collections.Sequence)
- else data_date_to_str(data_date, freq)
- )
- if source:
- args["source"]= source
- data = fetch(query_url, args, cache=cache)
- if convert_date:
- data = convert_dates_to_datetime(data)
- return data
- def id_only_query(query_url, query_id, cache):
- """
- Retrieve information when ids are the only arguments
- :query_url: the base url to use for the query
- :query_id: an id or sequence of ids
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects describing results
- """
- if query_id:
- query_url ="/".join((query_url, parse_value_or_iterable(query_id)))
- returnWBSearchResult(fetch(query_url))
- def get_source(source_id=None, cache=True):
- """
- Retrieve information on a source
- :source_id: a source id or sequence thereof. None returns all sources
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects describing selected
- sources
- """
- return id_only_query(SOURCES_URL, source_id, cache=cache)
- def get_incomelevel(level_id=None, cache=True):
- """
- Retrieve information on an income level aggregate
- :level_id: a level id or sequence thereof. None returns all income level
- aggregates
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects describing selected
- income level aggregates
- """
- return id_only_query(ILEVEL_URL, level_id, cache=cache)
- def get_topic(topic_id=None, cache=True):
- """
- Retrieve information on a topic
- :topic_id: a topic id or sequence thereof. None returns all topics
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects describing selected
- topic aggregates
- """
- return id_only_query(TOPIC_URL, topic_id, cache=cache)
- def get_lendingtype(type_id=None, cache=True):
- """
- Retrieve information on an income level aggregate
- :level_id: lending type id or sequence thereof. None returns all lending
- type aggregates
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects describing selected
- topic aggregates
- """
- return id_only_query(LTYPE_URL, type_id, cache=cache)
- def get_country(country_id=None, incomelevel=None, lendingtype=None, cache=True):
- """
- Retrieve information on a country or regional aggregate. Can specify
- either country_id, or the aggregates, but not both
- :country_id: a country id or sequence thereof. None returns all countries
- and aggregates.
- :incomelevel: desired incomelevel id or ids.
- :lendingtype: desired lendingtype id or ids.
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects representing each
- country
- """
- if country_id:
- if incomelevel or lendingtype:
- raiseValueError("Can't specify country_id and aggregates")
- return id_only_query(COUNTRIES_URL, country_id, cache=cache)
- args ={}
- if incomelevel:
- args["incomeLevel"]= parse_value_or_iterable(incomelevel)
- if lendingtype:
- args["lendingType"]= parse_value_or_iterable(lendingtype)
- returnWBSearchResult(fetch(COUNTRIES_URL, args, cache=cache))
- def get_indicator(indicator=None, source=None, topic=None, cache=True):
- """
- Retrieve information about an indicator or indicators. Only one of
- indicator, source, and topic can be specified. Specifying none of the
- three will return all indicators.
- :indicator: an indicator code or sequence thereof
- :source: a source id or sequence thereof
- :topic: a topic id or sequence thereof
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects representing
- indicators
- """
- if indicator:
- if source or topic:
- raiseValueError(INDIC_ERROR)
- query_url ="/".join((INDICATOR_URL, parse_value_or_iterable(indicator)))
- elif source:
- if topic:
- raiseValueError(INDIC_ERROR)
- query_url ="/".join(
- (SOURCES_URL, parse_value_or_iterable(source),"indicators")
- )
- elif topic:
- query_url ="/".join((TOPIC_URL, parse_value_or_iterable(topic),"indicators"))
- else:
- query_url = INDICATOR_URL
- returnWBSearchResult(fetch(query_url, cache=cache))
- def search_indicators(query, source=None, topic=None, cache=True):
- """
- Search indicators for a certain regular expression. Only one of source or
- topic can be specified. In interactive mode, will return None and print ids
- and names unless suppress_printing is True.
- :query: the term to match against indicator names
- :source: if present, id of desired source
- :topic: if present, id of desired topic
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects representing search
- indicators
- """
- indicators = get_indicator(source=source, topic=topic, cache=cache)
- pattern = re.compile(query, re.IGNORECASE)
- returnWBSearchResult(i for i in indicators if pattern.search(i["name"]))
- def search_countries(query, incomelevel=None, lendingtype=None, cache=True):
- """
- Search countries by name. Very simple search.
- :query: the string to match against country names
- :incomelevel: if present, search only the matching incomelevel
- :lendingtype: if present, search only the matching lendingtype
- :cache: use the cache
- :returns: WBSearchResult containing dictionary objects representing
- countries
- """
- countries = get_country(
- incomelevel=incomelevel, lendingtype=lendingtype, cache=cache
- )
- pattern = re.compile(query, re.IGNORECASE)
- returnWBSearchResult(i for i in countries if pattern.search(i["name"]))
- @uses_pandas
- def get_dataframe(
- indicators,
- country="all",
- data_date=None,
- freq="Y",
- source=None,
- convert_date=False,
- keep_levels=False,
- cache=True,
- ):
- """
- Convenience function to download a set of indicators and merge them into a
- pandas DataFrame. The index will be the same as if calls were made to
- get_data separately.
- :indicators: An dictionary where the keys are desired indicators and the
- values are the desired column names
- :country: a country code, sequence of country codes, or "all" (default)
- :data_date: the desired date as a datetime object or a 2-sequence with
- start and end dates
- :freq: the desired periodicity of the data, one of 'Y' (yearly), 'M'
- (monthly), or 'Q' (quarterly). The indicator may or may not support the
- specified frequency.
- :source: the specific source to retrieve data from (defaults on API to 2,
- World Development Indicators)
- :convert_date: if True, convert date field to a datetime.datetime object.
- :keep_levels: if True don't reduce the number of index levels returned if
- only getting one date or country
- :cache: use the cache
- :returns: a WBDataFrame
- """
- serieses =[
- (
- get_series(
- indicator=indicator,
- country=country,
- data_date=data_date,
- freq=freq,
- source=source,
- convert_date=convert_date,
- keep_levels=keep_levels,
- cache=cache,
- ).rename(name)
- )
- for indicator, name in indicators.items()
- ]
- result =None
- for series in serieses:
- if result isNone:
- result = series.to_frame()
- else:
- result = result.join(series.to_frame(), how="outer")
- result =WBDataFrame(result)
- result.last_updated ={i.name: i.last_updated for i in serieses}
- return result
- """
- wbdata.fetcher: retrieve and cache queries
- """
- import datetime
- import json
- import logging
- import pickle
- import pprint
- import os
- import appdirs
- import requests
- from pathlib importPath
- EXP =7
- PER_PAGE =1000
- TODAY = datetime.date.today()
- TRIES =5
- classWBResults(list):
- def __init__(self,*args,**kwargs):
- super().__init__(*args,**kwargs)
- self.last_updated =None
- classCache(object):
- """Docstring for Cache """
- def __init__(self):
- self.path =Path(
- os.getcwd()+'/cache/'+__version__
- )
- self.path.parent.mkdir(parents=True, exist_ok=True)
- try:
- with self.path.open("rb")as cachefile:
- self.cache ={
- i:(date, json)
- for i,(date, json)in pickle.load(cachefile).items()
- if(TODAY - datetime.date.fromordinal(date)).days < EXP
- }
- except(IOError,EOFError):
- self.cache ={}
- def __getitem__(self, key):
- return self.cache[key][1]
- def __setitem__(self, key, value):
- self.cache[key]= TODAY.toordinal(), value
- self.sync()
- def __contains__(self, item):
- return item in self.cache
- def sync(self):
- """Sync cache to disk"""
- with self.path.open("wb")as cachefile:
- pickle.dump(self.cache, cachefile)
- CACHE =Cache()
- def get_json_from_url(url, args):
- """
- Fetch a url directly from the World Bank, up to TRIES tries
- :url: the url to retrieve
- :args: a dictionary of GET arguments
- :returns: a string with the url contents
- """
- for _ in range(TRIES):
- try:
- return requests.get(url, args).text
- except requests.ConnectionError:
- continue
- logging.error(f"Error connecting to {url}")
- raiseRuntimeError("Couldn't connect to API")
- def get_response(url, args, cache=True):
- """
- Get single page response from World Bank API or from cache
- : query_url: the base url to be queried
- : args: a dictionary of GET arguments
- : cache: use the cache
- : returns: a dictionary with the response from the API
- """
- logging.debug(f"fetching {url}")
- key =(url, tuple(sorted(args.items())))
- if cache and key in CACHE:
- response = CACHE[key]
- else:
- response = get_json_from_url(url, args)
- if cache:
- CACHE[key]= response
- return json.loads(response)
- def fetch(url, args=None, cache=True):
- """Fetch data from the World Bank API or from cache.
- Given the base url, keep fetching results until there are no more pages.
- : query_url: the base url to be queried
- : args: a dictionary of GET arguments
- : cache: use the cache
- : returns: a list of dictionaries containing the response to the query
- """
- if args isNone:
- args ={}
- else:
- args = dict(args)
- args["format"]="json"
- args["per_page"]= PER_PAGE
- results =[]
- pages, this_page =0,1
- while pages != this_page:
- response = get_response(url, args, cache=cache)
- try:
- results.extend(response[1])
- this_page = response[0]["page"]
- pages = response[0]["pages"]
- except(IndexError,KeyError):
- try:
- message = response[0]["message"][0]
- raiseRuntimeError(
- f"Got error {message['id']} ({message['key']}): "
- f"{message['value']}"
- )
- except(IndexError,KeyError):
- raiseRuntimeError(
- f"Got unexpected response:\n{pprint.pformat(response)}"
- )
- logging.debug(f"Processed page {this_page} of {pages}")
- args["page"]= int(this_page)+1
- for i in results:
- if"id"in i:
- i["id"]= i["id"].strip()
- results =WBResults(results)
- try:
- results.last_updated = datetime.datetime.strptime(
- response[0]["lastupdated"],"%Y-%m-%d"
- )
- exceptKeyError:
- pass
- return results
用法:
- if __name__ == '__main__':
- print('========================== start worldbank =====================')
- #topic-> indicator
- print(os.getcwd())
- countries = ['CHN', 'USA']
- indicators = {'NY.GDP.PCAP.PP.CD' : 'GDP per capita, PPP (current international $)'}
- dt = (datetime.datetime(2020, 1, 1), datetime.datetime(2022, 1, 1))
- #df = WorldBankAPI.get_dataframe(indicators, country=countries, convert_date=False, data_date=dt)
- df = WorldBankAPI.get_dataframe(indicators, convert_date=False, data_date=dt)
- print(df)
- print('========================== end ============================')