Source code for netallocation.cost

import numpy as np
from numpy import sign
import pandas as pd
from pypsa.descriptors import (get_switchable_as_dense as get_as_dense,
                               get_extendable_i, get_non_extendable_i,
                               nominal_attrs)
from xarray import DataArray, Dataset, concat
import pypsa
import logging

from .flow import flow_allocation, network_flow
from .utils import (reindex_by_bus_carrier, check_carriers, check_snapshots,
                    get_branches_i, split_one_ports, split_branches,
                    snapshot_weightings, split_one_ports,
                    get_as_dense_by_bus_carrier)
from .linalg import norm
from .convert import peer_to_peer, virtual_patterns
from .breakdown import expand_by_source_type
from .grid import (Incidence, impedance, energy_production, energy_demand,
                   power_production, Cycles)

logger = logging.getLogger(__name__)


[docs]def allocate_one_port_operational_cost(ds, n, snapshots=None, dim='source'): """ Allocate production costs on the basis of a peer-to-peer allocation. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network dim : str, default 'source' Dimension to whicht the production costs are associated to, (must contain bus names). Returns ------- xr.DataArray Allocated generation cost. """ snapshots = check_snapshots(ds.snapshot, n) check_carriers(n) ds = expand_by_source_type(ds, n, dim=dim) comps = ['Generator', 'StorageUnit', 'Store'] mc = get_as_dense_by_bus_carrier(n, 'marginal_cost', comps, snapshots)\ .rename(bus=dim, carrier='source_carrier') attr = {'payer': dim, 'allocation': 'one_port_operational_cost'} return (mc * ds).assign_attrs(attr)
[docs]def allocate_co2_cost(ds, n, dim='source', co2_constr_name=None, co2_attr='co2_emissions', co2_price=None): """ Allocate CO2 emission costs on the basis of a peer-to-peer allocation. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network dim : str, default 'source' Dimension to which the production costs are associated to, (must contain bus names). Returns ------- xr.DataArray Allocated generation cost. """ snapshots = check_snapshots(ds.snapshot, n) check_carriers(n) ds = expand_by_source_type(ds, n, dim=dim) ep = nodal_co2_price(n, snapshots, co2_attr, co2_constr_name, co2_price)\ .rename(bus=dim, carrier='source_carrier') ep = ep * snapshot_weightings(n, snapshots) attr = {'payer': dim, 'allocation': 'co2_cost'} return (ep * ds).assign_attrs(attr)
[docs]def allocate_one_port_investment_cost(ds, n, dim='source', proportional=False): """ Allocate investment costs on the basis of a peer-to-peer allocation. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network dim : str, default 'source' Dimension to whicht the investment costs are associated to, (must contain bus names). Returns ------- xr.DataArray Allocated generation cost. """ check_carriers(n) ds = expand_by_source_type(ds, n, dim=dim) attrs = {'payer': dim, 'allocation': 'one_port_investment_cost'} keys = {'bus': dim, 'carrier': 'source_carrier'} if not proportional: c = 'Generator' mu_upper = reindex_by_bus_carrier(n.pnl(c).mu_upper, c, n).rename(keys) return ( ds * mu_upper.reindex_like( ds, fill_value=0)).assign_attrs(attrs) comps = ['Generator'] attr = nominal_attrs nom_opt = concat([reindex_by_bus_carrier(n.df(c)[attr[c] + "_opt"], c, n) for c in comps], dim='carrier') cap_cost = concat((reindex_by_bus_carrier(n.df(c).capital_cost, c, n) for c in comps), dim='carrier') investment_cost = (nom_opt * cap_cost).rename(keys) prod = power_production(n, per_carrier=True).rename(keys) normed = (ds / prod.sum('snapshot')).fillna(0) return (investment_cost.reindex_like(normed, fill_value=0) * normed)\ .assign_attrs(attrs)
[docs]def allocate_branch_operational_cost(ds, n): """ Allocate the branch cost on the basis of an allocation method. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network Returns ------- xr.DataArray Allocated branch cost. """ snapshots = check_snapshots(ds.snapshot, n) check_carriers(n) branchcost_pu = pd.concat([get_as_dense(n, 'Link', 'marginal_cost', snapshots)], keys=['Link'], axis=1, names=['component', 'branch_i']) branchcost_pu = DataArray(branchcost_pu, dims=['snapshot', 'branch']) attr = {'allocation': 'branch_operational_cost'} return ( branchcost_pu.reindex_like( ds, fill_value=0) * ds).assign_attrs(attr)
[docs]def allocate_branch_investment_cost(ds, n, proportional=False): """ Allocate the branch cost on the basis of an allocation method. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network Returns ------- xr.DataArray Allocated branch cost. """ check_carriers(n) names = ['component', 'branch_i'] comps = np.unique(ds.component) attrs = {'allocation': 'branch_investment_cost'} if not proportional: mu = pd.concat({c: n.pnl(c).mu_upper - n.pnl(c).mu_lower for c in comps}, axis=1, names=names) mu = DataArray(mu, dims=['snapshot', 'branch']) return (ds * mu.reindex_like(ds, fill_value=0)).assign_attrs(attrs) nom_attr = pd.Series(nominal_attrs)[comps] + '_opt' flow = network_flow(n, branch_components=nom_attr.index) investment_cost = pd.concat({c: n.df(c).eval(f'capital_cost * {attr}') for c, attr in nom_attr.items()}, names=names) investment_cost = DataArray(investment_cost, dims='branch') normed = (ds / flow.sum('snapshot')).fillna(0) return (investment_cost * normed).assign_attrs(attrs)
[docs]def allocate_carrier_attribute(ds, n, attr): """ Allocate an carrier attribute on the basis of a peer-to-peer allocation. Parameters ---------- ds : xarray.Dataset Calculated power allocation dataset, i.e. from ntl.allocate_flow. n : pypsa.Network attr : str/pd.Series/pd.DataFrame Returns ------- xr.DataArray """ check_carriers(n) ds = expand_by_source_type(ds, n) return DataArray(n.carriers[attr], dims='source_carrier') * ds
# ============================================================================= # Power Market Quantities # =============================================================================
[docs]def locational_market_price(n, snapshots=None): """ Get the locational market price (LMP) of 1 MWh in a solved network. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. Returns ------- ma : xr.DataArray Marginal Price for dimensions {bus, snapshot}. """ snapshots = check_snapshots(snapshots, n) ma = n.buses_t.marginal_price.loc[snapshots] if isinstance(ma, pd.Series): return DataArray(ma, dims=['bus']).assign_coords(snapshot=snapshots) else: return DataArray(ma, dims=['snapshot', 'bus'])
[docs]def locational_market_price_diff(n, snapshots=None): return Incidence(n) @ locational_market_price(n, snapshots)
[docs]def cycle_constraint_price(n, snapshots=None): """ Calculate the cost per branch and snapshot for the cycle constraint Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. Returns ------- xr.DataArray Cycle cost for passive branches, dimension {snapshot, branch}. """ C = Cycles(n) snapshots = check_snapshots(snapshots, n) comps = n.passive_branch_components z = impedance(n, comps, snapshots) sp = n.sub_networks_t.mu_kirchhoff_voltage_law.loc[snapshots] shadowprice = DataArray(sp, dims=['snapshot', 'cycle']) * 1e5 return (C * z * shadowprice).sum('cycle')
[docs]def cycle_constraint_cost(n, snapshots=None): """ Calculate the cost per branch and snapshot for the cycle constraint Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. Returns ------- xr.DataArray Cycle cost for passive branches, dimension {snapshot, branch}. """ # Interesting fact: The cycle constraint cost are not weighted with # snapshot_weightings different to 1. C = [] i = 0 for sub in n.sub_networks.obj: coords = { 'branch': sub.branches().index.rename( ('component', 'branch_i')), 'cycle': range( i, i + sub.C.shape[1])} i += sub.C.shape[1] C.append(DataArray(sub.C.todense(), coords, ['branch', 'cycle'])) C = concat(C, dim='cycle').fillna(0) snapshots = check_snapshots(snapshots, n) comps = n.passive_branch_components f = network_flow(n, snapshots, comps) z = impedance(n, comps) sp = n.sub_networks_t.mu_kirchhoff_voltage_law.loc[snapshots] shadowprice = DataArray(sp, dims=['snapshot', 'cycle']) * 1e5 return (C * z * f * shadowprice).sum('cycle')
[docs]def congestion_revenue(n, snapshots=None, split=False): """ Calculate the congestion revenue (CR) per brnach and snapshot. The CR includes all costs of the transmission system. The sum over all snapshots of this is equal to the capital investments per branch. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. split : bool, optional If True, two CRs are returned, one indicating the renvenue for extendable branches, one for non-extendable branches. Returns ------- xr.DataArray Congestion Revenue, dimension {snapshot, branch}. """ cr = - locational_market_price_diff(n, snapshots).transpose() * \ network_flow(n, snapshots) * snapshot_weightings(n, snapshots) if 'mu_kirchhoff_voltage_law' in n.sub_networks_t: cr += cycle_constraint_cost(n, snapshots).reindex_like(cr, fill_value=0) else: logger.warning( ' The cost of cycle constraints cannot be calculated, as ' 'the shadowprices for those are missing. Please solve the ' 'network with `keep_shadowprices=True` for including them.') return split_branches(cr, n) if split else cr
[docs]def nodal_demand_cost(n, snapshots=None): """ Calculate the nodal demand cost per bus and snapshot. This is calculated by the product of power demand times the marginal price. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. """ snapshots = check_snapshots(snapshots, n) return ( energy_demand( n, snapshots) * locational_market_price( n, snapshots)) .rename('nodal_demand_cost')
[docs]def nodal_co2_price(n, snapshots=None, co2_attr='co2_emissions', co2_constr_name=None, price=None): """ Get the CO2 price per MWh_el. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. co2_attr : str, optional Name of the co2 emission attribut in n.carriers. The default is 'co2_emissions'. co2_constr_name : str, optional Name of the CO2 limit constraint in n.global_constraint. The default is None will lead to searching for constraints which contain the strings "CO2" and "Limit". """ c = 'Generator' if price is None: if co2_constr_name is None: con_i = n.global_constraints.index co2_constr_name = con_i[con_i.str.contains('CO2', case=False) & con_i.str.contains('Limit', case=False)] if co2_constr_name.empty: logger.warning('No CO2 constraint found.') return reindex_by_bus_carrier( pd.Series(0, n.generators.index), c, n) else: co2_constr_name = co2_constr_name[0] elif co2_constr_name not in n.global_constraints.index: logger.warning( f'Constraint {co2_constr_name} not in n.global_constraints' ', setting CO₂ constraint cost to zero.') return reindex_by_bus_carrier(pd.Series(0, n.generators.index), c, n) price = n.global_constraints.mu[co2_constr_name] eff_emission = n.df(c).carrier.map( n.carriers[co2_attr]) / n.df(c).efficiency return price * reindex_by_bus_carrier(eff_emission, c, n)
[docs]def nodal_co2_cost(n, snapshots=None, co2_attr='co2_emissions', co2_constr_name=None, split=False): """ Calculate the total system cost caused by the CO2 constraint. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. co2_attr : str, optional Name of the co2 emission attribut in n.carriers. The default is 'co2_emissions'. co2_constr_name : str, optional Name of the CO2 limit constraint in n.global_constraint. The default is None will lead to searching for constraints which contain the strings "CO2" and "Limit". """ price_per_gen = nodal_co2_price(n, snapshots, co2_attr, co2_constr_name) cost = (energy_production(n, snapshots, per_carrier=True) * price_per_gen) if split: return split_one_ports(cost, n).sum('carrier') return cost.sum('carrier').rename('nodal_co2_cost')
[docs]def nodal_production_revenue( n, snapshots=None, split=False, per_carrier=False): """ Calculate the nodal production revenue per bus and snapshot. This is calculated by the product of power production times the marginal price. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots The default None results in taking all snapshots of n. """ snapshots = check_snapshots(snapshots, n) if split: pr = energy_production(n, snapshots, per_carrier=True) * \ locational_market_price(n, snapshots) pr = split_one_ports(pr, n) return pr if per_carrier else pr.sum('carrier') return (energy_production(n, snapshots, per_carrier=per_carrier) * locational_market_price(n, snapshots))\ .rename('nodal_production_revenue')
[docs]def objective_constant(n): nom_attr = nominal_attrs.items() constant = 0 for c, attr in nom_attr: ext_i = get_extendable_i(n, c) constant += n.df(c)[attr][ext_i] @ n.df(c).capital_cost[ext_i] return constant
[docs]def allocate_revenue(n, snapshots=None, chunksize=None, with_cycle_prices=False, **kwargs): """ Allocate the market revenue per asset based Average Participation. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots method : str/xarray.Dataset Method on which the revenue allocation is based. Must be an available method for ``netallocation.allocate_flow``. Alternatively to a string, an calculated allocation xarray.Dataset can be passed. kwargs : dict Keyword arguments for ``flow_allocation()``. For operational and capital costs for branches, the allocation default to q=0, meaning that all transmission costs are covered by the sinks. For a 50%-50% split between sinks and sources, set q to 0.5. for transmission cost totally allocated to sources, set q to 1. Returns ------- xarray.DataArray Peer-to-peer cost allocation. """ if 'q' in kwargs and 'aggregated' in kwargs: if kwargs['q'] > 0 and not kwargs['aggregated']: logger.warning( 'Setting parameter q != 0 with non aggregated ' 'coupling will results in inaccurate cost allocation.') ds = flow_allocation(n, snapshots, 'ptpf', **kwargs) was_timestep = False if not ds.snapshot.shape: was_timestep = True ds = ds.expand_dims('snapshot') p2p = expand_by_source_type(ds.peer_to_peer, n, chunksize=chunksize) vfp = ds.peer_on_branch_to_peer.sum('source') lmp = locational_market_price(n, snapshots) lmp_diff = locational_market_price_diff(n, snapshots) if with_cycle_prices: lmp_diff -= cycle_constraint_price(n, snapshots) d = dict(sink='payer', branch='receiver_branch', source='receiver_node', source_carrier='receiver_carrier') res = Dataset({'dispatch_revenue': p2p * lmp.rename(bus='source'), 'congestion_revenue': - vfp * lmp_diff}) res = res.rename(d) return res.sel(snapshot=res.snapshot[0]) if was_timestep else res
[docs]def allocate_cost(n, snapshots=None, method='ap', chunksize=None, **kwargs): """ Allocate production cost based on an allocation method. Parameters ---------- n : pypsa.Network snapshots : subset of n.snapshots method : str/xarray.Dataset Method on which the cost allocation is based. Must be an available method for ``netallocation.allocate_flow``. Alternatively to a string, an calculated allocation xarray.Dataset can be passed. kwargs : dict Keyword arguments for ``flow_allocation()``. For operational and capital costs for branches, the allocation default to q=0, meaning that all transmission costs are covered by the sinks. For a 50%-50% split between sinks and sources, set q to 0.5. for transmission cost totally allocated to sources, set q to 1. Returns ------- xarray.DataArray Peer-to-peer cost allocation. """ if 'q' in kwargs and 'aggregated' in kwargs: if kwargs['q'] > 0 and not kwargs['aggregated']: logger.warning( 'Setting parameter q != 0 with non aggregated ' 'coupling will results in inaccurate cost allocation.') if isinstance(method, str): ds = flow_allocation(n, snapshots, method, **kwargs) else: ds = method was_timestep = False if not ds.snapshot.shape: was_timestep = True ds = ds.expand_dims('snapshot') # Does not seems right to hard-code q, but is only for necessary AP for # now. ds = virtual_patterns(ds, n, q=0) ds = peer_to_peer(ds, n) ds = expand_by_source_type(ds, n, chunksize=chunksize) p2p = ds.peer_to_peer vfp = ds.virtual_flow_pattern co2_price = kwargs.pop('co2_price', None) op_one_port = allocate_one_port_operational_cost(p2p, n) co2_one_port = allocate_co2_cost(p2p, n, co2_price=co2_price) inv_one_port = allocate_one_port_investment_cost(p2p, n) op_branch = allocate_branch_operational_cost(vfp, n) inv_branch = allocate_branch_investment_cost(vfp, n) d = dict(sink='payer', bus='payer', branch='receiver_branch', source='receiver_node', source_carrier='receiver_carrier') def rename(da): return da.rename( {k: v for k, v in d.items() if k in da.dims}) res = Dataset({da.attrs['allocation']: rename(da) for da in [op_one_port, co2_one_port, inv_one_port, op_branch, inv_branch] if da.sum() != 0}) return res.sel(snapshot=res.snapshot[0]) if was_timestep else res