| import csv |
| import json |
| import sys |
| import os |
| import shutil |
| import datetime |
| from dateutil.parser import parse |
| from random import random |
| from collections import defaultdict, Counter |
|
|
| from amlsim.account_data_type_lookup import AccountDataTypeLookup |
| from faker import Faker |
| import numpy as np |
|
|
|
|
| def days_to_date(days): |
| date = datetime.datetime(2017, 1, 1) + datetime.timedelta(days=days) |
| return date.strftime("%Y%m%d") |
|
|
|
|
| def get_simulator_name(csv_file): |
| """Convert log file name to the simulator name |
| :param csv_file: Transaction log file name |
| :return: Simulator name |
| """ |
| elements = csv_file.split("_") |
| return "_".join(elements[:4]) |
|
|
|
|
| def get_name(acct_id): |
| return "Account" + str(acct_id) |
|
|
|
|
| def get_bank(acct_id): |
| return "Bank" + str(acct_id) |
|
|
|
|
| CASH_TYPES = {"CASH-IN", "CASH-OUT"} |
|
|
|
|
| class AMLTypology: |
| """Suspicious transaction and account group |
| """ |
|
|
| def __init__(self, reason): |
| self.is_sar = False |
| self.main_acct = None |
| self.reason = reason |
| self.transactions = dict() |
| self.members = set() |
| self.recorded_members = set() |
| self.total_amount = 0.0 |
| self.count = 0 |
|
|
| def add_member(self, member, is_sar): |
| self.members.add(member) |
| if is_sar: |
| self.is_sar = True |
| self.main_acct = member |
|
|
| def add_tx(self, tx_id, amount, days, orig_acct, dest_acct, orig_name, dest_name, attr): |
| self.transactions[tx_id] = (amount, days, orig_acct, dest_acct, orig_name, dest_name, attr) |
| self.total_amount += amount |
| self.count += 1 |
|
|
| def get_reason(self): |
| return self.reason |
|
|
| def get_start_date(self): |
| min_days = min([tx[1] for tx in self.transactions.values()]) |
| return days_to_date(min_days) |
|
|
| def get_end_date(self): |
| max_days = max([tx[1] for tx in self.transactions.values()]) |
| return days_to_date(max_days) |
|
|
|
|
| class Schema: |
| def __init__(self, data, base_date): |
| self._base_date = base_date |
|
|
| self.data = data |
|
|
| self.acct_num_cols = None |
| self.acct_names = list() |
| self.acct_defaults = list() |
| self.acct_types = list() |
| self.acct_name2idx = dict() |
| self.acct_id_idx = None |
| self.acct_name_idx = None |
| self.acct_balance_idx = None |
| self.acct_start_idx = None |
| self.acct_end_idx = None |
| self.acct_sar_idx = None |
| self.acct_model_idx = None |
| self.acct_bank_idx = None |
|
|
| self.tx_num_cols = None |
| self.tx_names = list() |
| self.tx_defaults = list() |
| self.tx_types = list() |
| self.tx_name2idx = dict() |
| self.tx_id_idx = None |
| self.tx_time_idx = None |
| self.tx_amount_idx = None |
| self.tx_type_idx = None |
| self.tx_orig_idx = None |
| self.tx_dest_idx = None |
| self.tx_sar_idx = None |
| self.tx_alert_idx = None |
|
|
| self.alert_acct_num_cols = None |
| self.alert_acct_names = list() |
| self.alert_acct_defaults = list() |
| self.alert_acct_types = list() |
| self.alert_acct_name2idx = dict() |
| self.alert_acct_alert_idx = None |
| self.alert_acct_reason_idx = None |
| self.alert_acct_id_idx = None |
| self.alert_acct_name_idx = None |
| self.alert_acct_sar_idx = None |
| self.alert_acct_model_idx = None |
| self.alert_acct_schedule_idx = None |
| self.alert_acct_bank_idx = None |
|
|
| self.alert_tx_num_cols = None |
| self.alert_tx_names = list() |
| self.alert_tx_defaults = list() |
| self.alert_tx_types = list() |
| self.alert_tx_name2idx = dict() |
| self.alert_tx_id_idx = None |
| self.alert_tx_type_idx = None |
| self.alert_tx_sar_idx = None |
| self.alert_tx_idx = None |
| self.alert_tx_orig_idx = None |
| self.alert_tx_dest_idx = None |
| self.alert_tx_tx_type_idx = None |
| self.alert_tx_amount_idx = None |
| self.alert_tx_time_idx = None |
|
|
| self.party_ind_num_cols = None |
| self.party_ind_names = list() |
| self.party_ind_defaults = list() |
| self.party_ind_types = list() |
| self.party_ind_name2idx = dict() |
| self.party_ind_id_idx = None |
|
|
| self.party_org_num_cols = None |
| self.party_org_names = list() |
| self.party_org_defaults = list() |
| self.party_org_types = list() |
| self.party_org_name2idx = dict() |
| self.party_org_id_idx = None |
|
|
| self.acct_party_num_cols = None |
| self.acct_party_names = list() |
| self.acct_party_defaults = list() |
| self.acct_party_types = list() |
| self.acct_party_name2idx = dict() |
| self.acct_party_mapping_idx = None |
| self.acct_party_acct_idx = None |
| self.acct_party_party_idx = None |
|
|
| self.party_party_num_cols = None |
| self.party_party_names = list() |
| self.party_party_defaults = list() |
| self.party_party_types = list() |
| self.party_party_name2idx = dict() |
| self.party_party_ref_idx = None |
| self.party_party_first_idx = None |
| self.party_party_second_idx = None |
| self._parse() |
|
|
| def _parse(self): |
| acct_data = self.data["account"] |
| tx_data = self.data["transaction"] |
| alert_tx_data = self.data["alert_tx"] |
| alert_acct_data = self.data["alert_member"] |
| party_ind_data = self.data["party_individual"] |
| party_org_data = self.data["party_organization"] |
| acct_party_data = self.data["account_mapping"] |
| party_party_data = self.data["resolved_entities"] |
|
|
| self.acct_num_cols = len(acct_data) |
| self.tx_num_cols = len(tx_data) |
| self.alert_tx_num_cols = len(alert_tx_data) |
| self.alert_acct_num_cols = len(alert_acct_data) |
| self.party_ind_num_cols = len(party_ind_data) |
| self.party_org_num_cols = len(party_org_data) |
| self.acct_party_num_cols = len(acct_party_data) |
| self.party_party_num_cols = len(party_party_data) |
|
|
| |
| for idx, col in enumerate(acct_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.acct_names.append(name) |
| self.acct_defaults.append(default) |
| self.acct_types.append(v_type) |
|
|
| |
| for idx, col in enumerate(tx_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.tx_names.append(name) |
| self.tx_defaults.append(default) |
| self.tx_types.append(v_type) |
| self.tx_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "transaction_id": |
| self.tx_id_idx = idx |
| elif d_type == "timestamp": |
| self.tx_time_idx = idx |
| elif d_type == "amount": |
| self.tx_amount_idx = idx |
| elif d_type == "transaction_type": |
| self.tx_type_idx = idx |
| elif d_type == "orig_id": |
| self.tx_orig_idx = idx |
| elif d_type == "dest_id": |
| self.tx_dest_idx = idx |
| elif d_type == "sar_flag": |
| self.tx_sar_idx = idx |
| elif d_type == "alert_id": |
| self.tx_alert_idx = idx |
|
|
| |
| for idx, col in enumerate(alert_acct_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.alert_acct_names.append(name) |
| self.alert_acct_defaults.append(default) |
| self.alert_acct_types.append(v_type) |
| self.alert_acct_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "alert_id": |
| self.alert_acct_alert_idx = idx |
| elif d_type == "alert_type": |
| self.alert_acct_reason_idx = idx |
| elif d_type == "account_id": |
| self.alert_acct_id_idx = idx |
| elif d_type == "account_name": |
| self.alert_acct_name_idx = idx |
| elif d_type == "sar_flag": |
| self.alert_acct_sar_idx = idx |
| elif d_type == "model_id": |
| self.alert_acct_model_idx = idx |
| elif d_type == "schedule_id": |
| self.alert_acct_schedule_idx = idx |
| elif d_type == "bank_id": |
| self.alert_acct_bank_idx = idx |
|
|
| |
| for idx, col in enumerate(alert_tx_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.alert_tx_names.append(name) |
| self.alert_tx_defaults.append(default) |
| self.alert_tx_types.append(v_type) |
| self.alert_tx_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "alert_id": |
| self.alert_tx_id_idx = idx |
| elif d_type == "alert_type": |
| self.alert_tx_type_idx = idx |
| elif d_type == "sar_flag": |
| self.alert_tx_sar_idx = idx |
| elif d_type == "transaction_id": |
| self.alert_tx_idx = idx |
| elif d_type == "orig_id": |
| self.alert_tx_orig_idx = idx |
| elif d_type == "dest_id": |
| self.alert_tx_dest_idx = idx |
| elif d_type == "transaction_type": |
| self.alert_tx_tx_type_idx = idx |
| elif d_type == "amount": |
| self.alert_tx_amount_idx = idx |
| elif d_type == "timestamp": |
| self.alert_tx_time_idx = idx |
|
|
| |
| for idx, col in enumerate(party_ind_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.party_ind_names.append(name) |
| self.party_ind_defaults.append(default) |
| self.party_ind_types.append(v_type) |
| self.party_ind_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "party_id": |
| self.party_ind_id_idx = idx |
|
|
| |
| for idx, col in enumerate(party_org_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.party_org_names.append(name) |
| self.party_org_defaults.append(default) |
| self.party_org_types.append(v_type) |
| self.party_org_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "party_id": |
| self.party_org_id_idx = idx |
|
|
| |
| for idx, col in enumerate(acct_party_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.acct_party_names.append(name) |
| self.acct_party_defaults.append(default) |
| self.acct_party_types.append(v_type) |
| self.acct_party_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "mapping_id": |
| self.acct_party_mapping_idx = idx |
| elif d_type == "account_id": |
| self.acct_party_acct_idx = idx |
| elif d_type == "party_id": |
| self.acct_party_party_idx = idx |
|
|
| |
| for idx, col in enumerate(party_party_data): |
| name = col["name"] |
| v_type = col.get("valueType", "string") |
| d_type = col.get("dataType") |
| default = col.get("defaultValue", "") |
|
|
| self.party_party_names.append(name) |
| self.party_party_defaults.append(default) |
| self.party_party_types.append(v_type) |
| self.party_party_name2idx[name] = idx |
|
|
| if d_type is None: |
| continue |
| if d_type == "ref_id": |
| self.party_party_ref_idx = idx |
| elif d_type == "first_id": |
| self.party_party_first_idx = idx |
| elif d_type == "second_id": |
| self.party_party_second_idx = idx |
|
|
| def days2date(self, _days): |
| """Get date as ISO 8601 format from days from the "base_date". If failed, return an empty string. |
| :param _days: Days from the "base_date" |
| :return: Date as ISO 8601 format |
| """ |
| try: |
| num_days = int(_days) |
| except ValueError: |
| return "" |
| dt = self._base_date + datetime.timedelta(num_days) |
| return dt.isoformat() + "Z" |
|
|
|
|
| def get_tx_row(self, _tx_id, _timestamp, _amount, _tx_type, _orig, _dest, _is_sar, _alert_id, **attr): |
| row = list(self.tx_defaults) |
| row[self.tx_id_idx] = _tx_id |
| row[self.tx_time_idx] = _timestamp |
| row[self.tx_amount_idx] = _amount |
| row[self.tx_type_idx] = _tx_type |
| row[self.tx_orig_idx] = _orig |
| row[self.tx_dest_idx] = _dest |
| row[self.tx_sar_idx] = _is_sar |
| row[self.tx_alert_idx] = _alert_id |
|
|
| for name, value in attr.items(): |
| if name in self.tx_name2idx: |
| idx = self.tx_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.tx_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_alert_acct_row(self, _alert_id, _reason, _acct_id, _acct_name, _is_sar, |
| _model_id, _schedule_id, _bank_id, **attr): |
| row = list(self.alert_acct_defaults) |
| row[self.alert_acct_alert_idx] = _alert_id |
| row[self.alert_acct_reason_idx] = _reason |
| row[self.alert_acct_id_idx] = _acct_id |
| row[self.alert_acct_name_idx] = _acct_name |
| row[self.alert_acct_sar_idx] = _is_sar |
| row[self.alert_acct_model_idx] = _model_id |
| row[self.alert_acct_schedule_idx] = _schedule_id |
| row[self.alert_acct_bank_idx] = _bank_id |
|
|
| for name, value in attr.items(): |
| if name in self.alert_acct_name2idx: |
| idx = self.alert_acct_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.alert_acct_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_alert_tx_row(self, _alert_id, _alert_type, _is_sar, _tx_id, _orig, _dest, |
| _tx_type, _amount, _timestamp, **attr): |
| row = list(self.alert_tx_defaults) |
| row[self.alert_tx_id_idx] = _alert_id |
| row[self.alert_tx_type_idx] = _alert_type |
| row[self.alert_tx_sar_idx] = _is_sar |
| row[self.alert_tx_idx] = _tx_id |
| row[self.alert_tx_orig_idx] = _orig |
| row[self.alert_tx_dest_idx] = _dest |
| row[self.alert_tx_tx_type_idx] = _tx_type |
| row[self.alert_tx_amount_idx] = _amount |
| row[self.alert_tx_time_idx] = _timestamp |
|
|
| for name, value in attr.items(): |
| if name in self.alert_tx_name2idx: |
| idx = self.alert_tx_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.alert_tx_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_party_ind_row(self, _party_id, **attr): |
| row = list(self.party_ind_defaults) |
| row[self.party_ind_id_idx] = _party_id |
|
|
| for name, value in attr.items(): |
| if name in self.party_ind_name2idx: |
| idx = self.party_ind_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.party_ind_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_party_org_row(self, _party_id, **attr): |
| row = list(self.party_org_defaults) |
| row[self.party_org_id_idx] = _party_id |
|
|
| for name, value in attr.items(): |
| if name in self.party_org_name2idx: |
| idx = self.party_org_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.party_org_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_acct_party_row(self, _mapping_id, _acct_id, _party_id, **attr): |
| row = list(self.acct_party_defaults) |
| row[self.acct_party_mapping_idx] = _mapping_id |
| row[self.acct_party_acct_idx] = _acct_id |
| row[self.acct_party_party_idx] = _party_id |
|
|
| for name, value in attr.items(): |
| if name in self.acct_party_name2idx: |
| idx = self.acct_party_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.acct_party_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
| def get_party_party_row(self, _ref_id, _first_id, _second_id, **attr): |
| row = list(self.party_party_defaults) |
| row[self.party_party_ref_idx] = _ref_id |
| row[self.party_party_first_idx] = _first_id |
| row[self.party_party_second_idx] = _second_id |
|
|
| for name, value in attr.items(): |
| if name in self.party_party_name2idx: |
| idx = self.party_party_name2idx[name] |
| row[idx] = value |
|
|
| for idx, v_type in enumerate(self.party_party_types): |
| if v_type == "date": |
| row[idx] = self.days2date(row[idx]) |
| return row |
|
|
|
|
| class LogConverter: |
|
|
| def __init__(self, conf, sim_name=None, fake=None): |
| self.reports = dict() |
| self.org_types = dict() |
|
|
| self.fake = fake |
|
|
| general_conf = conf.get('general', {}) |
| input_conf = conf.get('temporal', {}) |
| output_conf = conf.get('output', {}) |
|
|
| |
| |
| |
| self.sim_name = sim_name if sim_name is not None else general_conf.get("simulation_name", "sample") |
| print("Simulation name:", self.sim_name) |
|
|
| self.input_dir = os.path.join(input_conf.get('directory', ''), self.sim_name) |
| self.work_dir = os.path.join(output_conf.get('directory', ''), self.sim_name) |
| if not os.path.isdir(self.work_dir): |
| os.makedirs(self.work_dir) |
|
|
| param_dir = conf.get('input', {}).get('directory', '') |
| schema_file = conf.get('input', {}).get('schema', '') |
| base_date_str = general_conf.get('base_date', '2017-01-01') |
| base_date = parse(base_date_str) |
|
|
| json_file = os.path.join(param_dir, schema_file) |
| with open(json_file, "r") as rf: |
| data = json.load(rf) |
| self.schema = Schema(data, base_date) |
|
|
| |
| self.log_file = os.path.join(self.work_dir, output_conf["transaction_log"]) |
| self.in_acct_file = input_conf["accounts"] |
| self.group_file = input_conf["alert_members"] |
|
|
| |
| self.out_acct_file = output_conf["accounts"] |
| self.tx_file = output_conf["transactions"] |
| self.cash_tx_file = output_conf["cash_transactions"] |
| self.sar_acct_file = output_conf["sar_accounts"] |
| self.alert_tx_file = output_conf["alert_transactions"] |
| self.alert_acct_file = output_conf["alert_members"] |
|
|
| self.party_individual_file = output_conf["party_individuals"] |
| self.party_organization_file = output_conf["party_organizations"] |
| self.account_mapping_file = output_conf["account_mapping"] |
| self.resolved_entities_file = output_conf["resolved_entities"] |
|
|
| |
| dia_log = output_conf["diameter_log"] |
| src_dia_path = os.path.join(self.input_dir, dia_log) |
| dst_dia_path = os.path.join(self.work_dir, dia_log) |
| if os.path.exists(src_dia_path): |
| shutil.copy(src_dia_path, dst_dia_path) |
|
|
| def convert_acct_tx(self): |
| print("Convert transaction list from %s to %s, %s and %s" % ( |
| self.log_file, self.tx_file, self.cash_tx_file, self.alert_tx_file)) |
|
|
| in_acct_f = open(os.path.join(self.input_dir, self.in_acct_file), "r") |
| in_tx_f = open(self.log_file, "r") |
|
|
| out_acct_f = open(os.path.join(self.work_dir, self.out_acct_file), "w") |
| out_tx_f = open(os.path.join(self.work_dir, self.tx_file), "w") |
| out_cash_tx_f = open(os.path.join(self.work_dir, self.cash_tx_file), "w") |
| out_alert_tx_f = open(os.path.join(self.work_dir, self.alert_tx_file), "w") |
|
|
| out_ind_f = open(os.path.join(self.work_dir, self.party_individual_file), "w") |
| out_org_f = open(os.path.join(self.work_dir, self.party_organization_file), "w") |
| out_map_f = open(os.path.join(self.work_dir, self.account_mapping_file), "w") |
| out_ent_f = open(os.path.join(self.work_dir, self.resolved_entities_file), "w") |
|
|
| |
| reader = csv.reader(in_acct_f) |
| acct_writer = csv.writer(out_acct_f) |
| acct_writer.writerow(self.schema.acct_names) |
|
|
| ind_writer = csv.writer(out_ind_f) |
| ind_writer.writerow(self.schema.party_ind_names) |
| org_writer = csv.writer(out_org_f) |
| org_writer.writerow(self.schema.party_org_names) |
| map_writer = csv.writer(out_map_f) |
| map_writer.writerow(self.schema.acct_party_names) |
| ent_writer = csv.writer(out_ent_f) |
| ent_writer.writerow(self.schema.party_party_names) |
|
|
| header = next(reader) |
|
|
| mapping_id = 1 |
|
|
| lookup = AccountDataTypeLookup() |
| us_gen = self.fake['en_US'] |
|
|
| for row in reader: |
| output_row = list(self.schema.acct_defaults) |
|
|
| acct_type = "" |
| acct_id = "" |
|
|
| gender = np.random.choice(['Male', 'Female'], p=[0.5, 0.5]) |
|
|
| good_address = False |
| while good_address == False: |
| address = us_gen.address() |
| split1 = address.split('\n') |
| street_address = split1[0] |
| split2 = split1[1].split(', ') |
| if len(split2) == 2: |
| good_address = True |
| |
| city = split2[0] |
| split3 = split2[1].split(' ') |
| state = split3[0] |
| postcode = split3[1] |
| |
|
|
| for output_index, output_item in enumerate(self.schema.data['account']): |
| if 'dataType' in output_item: |
| output_type = output_item['dataType'] |
| input_type = lookup.inputType(output_type) |
|
|
| try: |
| input_index = header.index(input_type) |
| except ValueError: |
| continue |
|
|
| if output_type == "start_time": |
| try: |
| start = int(row[input_index]) |
| if start >= 0: |
| output_row[output_index] = start |
| except ValueError: |
| pass |
|
|
| elif output_type == "end_time": |
| try: |
| end = int(row[input_index]) |
| if end > 0: |
| output_row[output_index] = end |
| except ValueError: |
| pass |
|
|
| elif output_type == "account_id": |
| acct_id = row[input_index] |
| output_row[output_index] = acct_id |
|
|
| elif output_type == "account_type": |
| acct_type = row[input_index] |
| output_row[output_index] = acct_type |
| |
| else: |
| output_row[output_index] = row[input_index] |
|
|
| if 'valueType' in output_item: |
| if output_item['valueType'] == 'date': |
| output_row[output_index] = self.schema.days2date(output_row[output_index]) |
|
|
| |
| if 'name' in output_item: |
| if output_item['name'] == 'first_name': |
| output_row[output_index] = us_gen.first_name_male() if gender == "Male" else us_gen.first_name_female() |
| |
| elif output_item['name'] == 'last_name': |
| output_row[output_index] = us_gen.last_name_male() if gender == "Male" else us_gen.last_name_female() |
|
|
| elif output_item['name'] == 'street_addr': |
| output_row[output_index] = street_address |
|
|
| elif output_item['name'] == 'city': |
| output_row[output_index] = city |
|
|
| elif output_item['name'] == 'state': |
| output_row[output_index] = state |
|
|
| elif output_item['name'] == 'country': |
| output_row[output_index] = "US" |
|
|
| elif output_item['name'] == 'zip': |
| output_row[output_index] = postcode |
|
|
| elif output_item['name'] == 'gender': |
| output_row[output_index] = gender |
|
|
| elif output_item['name'] == 'birth_date': |
| output_row[output_index] = us_gen.date_of_birth() |
|
|
| elif output_item['name'] == 'ssn': |
| output_row[output_index] = us_gen.ssn() |
|
|
| elif output_item['name'] == 'lat': |
| output_row[output_index] = us_gen.latitude() |
| |
| elif output_item['name'] == 'lon': |
| output_row[output_index] = us_gen.longitude() |
|
|
| |
|
|
| acct_writer.writerow(output_row) |
| self.org_types[int(acct_id)] = acct_type |
|
|
| |
| is_individual = random() >= 0.5 |
| party_id = str(acct_id) |
| if is_individual: |
| output_row = self.schema.get_party_ind_row(party_id) |
| ind_writer.writerow(output_row) |
| else: |
| output_row = self.schema.get_party_org_row(party_id) |
| org_writer.writerow(output_row) |
|
|
| |
| output_row = self.schema.get_acct_party_row(mapping_id, acct_id, party_id) |
| map_writer.writerow(output_row) |
| mapping_id += 1 |
|
|
| in_acct_f.close() |
| out_ind_f.close() |
| out_org_f.close() |
| out_map_f.close() |
| out_ent_f.close() |
|
|
| |
| tx_set = set() |
| cash_tx_set = set() |
|
|
| |
| reader = csv.reader(in_tx_f) |
| tx_writer = csv.writer(out_tx_f) |
| cash_tx_writer = csv.writer(out_cash_tx_f) |
| alert_tx_writer = csv.writer(out_alert_tx_f) |
|
|
| header = next(reader) |
| indices = {name: index for index, name in enumerate(header)} |
| num_columns = len(header) |
|
|
| tx_header = self.schema.tx_names |
| alert_header = self.schema.alert_tx_names |
| tx_writer.writerow(tx_header) |
| cash_tx_writer.writerow(tx_header) |
| alert_tx_writer.writerow(alert_header) |
|
|
| step_idx = indices["step"] |
| amt_idx = indices["amount"] |
| orig_idx = indices["nameOrig"] |
| dest_idx = indices["nameDest"] |
| sar_idx = indices["isSAR"] |
| alert_idx = indices["alertID"] |
| type_idx = indices["type"] |
|
|
| tx_id = 1 |
| for row in reader: |
| if len(row) < num_columns: |
| continue |
| try: |
| days = int(row[step_idx]) |
| date_str = str(days) |
| amount = row[amt_idx] |
| orig_id = row[orig_idx] |
| dest_id = row[dest_idx] |
| sar_id = int(row[sar_idx]) |
| alert_id = int(row[alert_idx]) |
|
|
| is_sar = sar_id > 0 |
| is_alert = alert_id >= 0 |
| ttype = row[type_idx] |
| except ValueError: |
| continue |
|
|
| attr = {name: row[index] for name, index in indices.items()} |
| if ttype in CASH_TYPES: |
| cash_tx = (orig_id, dest_id, ttype, amount, date_str) |
| if cash_tx not in cash_tx_set: |
| cash_tx_set.add(cash_tx) |
| output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
| is_sar, alert_id, **attr) |
| cash_tx_writer.writerow(output_row) |
| else: |
| tx = (orig_id, dest_id, ttype, amount, date_str) |
| if tx not in tx_set: |
| output_row = self.schema.get_tx_row(tx_id, date_str, amount, ttype, orig_id, dest_id, |
| is_sar, alert_id, **attr) |
| tx_writer.writerow(output_row) |
| tx_set.add(tx) |
| if is_alert: |
| alert_type = self.reports.get(alert_id).get_reason() |
| alert_row = self.schema.get_alert_tx_row(alert_id, alert_type, is_sar, tx_id, orig_id, dest_id, |
| ttype, amount, date_str, **attr) |
| alert_tx_writer.writerow(alert_row) |
|
|
| if tx_id % 1000000 == 0: |
| print("Converted %d transactions." % tx_id) |
| tx_id += 1 |
|
|
| in_tx_f.close() |
| out_tx_f.close() |
| out_cash_tx_f.close() |
| out_alert_tx_f.close() |
|
|
| |
| deg_param = os.getenv("DEGREE") |
| if deg_param: |
| max_threshold = int(deg_param) |
| pred = defaultdict(set) |
| succ = defaultdict(set) |
| for orig, dest, _, _, _ in tx_set: |
| pred[dest].add(orig) |
| succ[orig].add(dest) |
| in_degrees = [len(nbs) for nbs in pred.values()] |
| out_degrees = [len(nbs) for nbs in succ.values()] |
| in_deg = Counter(in_degrees) |
| out_deg = Counter(out_degrees) |
| for th in range(2, max_threshold+1): |
| num_fan_in = sum([c for d, c in in_deg.items() if d >= th]) |
| num_fan_out = sum([c for d, c in out_deg.items() if d >= th]) |
| print("Number of fan-in / fan-out patterns with", th, "neighbors", num_fan_in, "/", num_fan_out) |
|
|
| def convert_alert_members(self): |
| input_file = self.group_file |
| output_file = self.alert_acct_file |
|
|
| print("Load alert groups: %s" % input_file) |
| rf = open(os.path.join(self.input_dir, input_file), "r") |
| wf = open(os.path.join(self.work_dir, output_file), "w") |
| reader = csv.reader(rf) |
| header = next(reader) |
| indices = {name: index for index, name in enumerate(header)} |
|
|
| writer = csv.writer(wf) |
| header = self.schema.alert_acct_names |
| writer.writerow(header) |
|
|
| for row in reader: |
| reason = row[indices["reason"]] |
| alert_id = int(row[indices["alertID"]]) |
| account_id = int(row[indices["accountID"]]) |
| is_sar = row[indices["isSAR"]].lower() == "true" |
| model_id = row[indices["modelID"]] |
| schedule_id = row[indices["scheduleID"]] |
| bank_id = row[indices["bankID"]] |
|
|
| if alert_id not in self.reports: |
| self.reports[alert_id] = AMLTypology(reason) |
| self.reports[alert_id].add_member(account_id, is_sar) |
|
|
| attr = {name: row[index] for name, index in indices.items()} |
| output_row = self.schema.get_alert_acct_row(alert_id, reason, account_id, account_id, is_sar, |
| model_id, schedule_id, bank_id, **attr) |
| writer.writerow(output_row) |
|
|
|
|
| def output_sar_cases(self): |
| """Extract SAR account list involved in alert transactions from transaction log file |
| """ |
| input_file = self.log_file |
| output_file = os.path.join(self.work_dir, self.sar_acct_file) |
|
|
| print("Convert SAR typologies from %s to %s" % (input_file, output_file)) |
| with open(input_file, "r") as rf: |
| reader = csv.reader(rf) |
| alerts = self.sar_accounts(reader) |
| |
| with open(output_file, "w") as wf: |
| writer = csv.writer(wf) |
| self.write_sar_accounts(writer, alerts) |
|
|
| |
| def sar_accounts(self, reader): |
| header = next(reader) |
| indices = {name: index for index, name in enumerate(header)} |
| columns = len(header) |
|
|
| tx_id = 0 |
| for row in reader: |
| if len(row) < columns: |
| continue |
| try: |
| days = int(row[indices["step"]]) |
| amount = float(row[indices["amount"]]) |
| orig = int(row[indices["nameOrig"]]) |
| dest = int(row[indices["nameDest"]]) |
| alert_id = int(row[indices["alertID"]]) |
| orig_name = "C_%d" % orig |
| dest_name = "C_%d" % dest |
| except ValueError: |
| continue |
|
|
| if alert_id >= 0 and alert_id in self.reports: |
| attr = {name: row[index] for name, index in indices.items()} |
| self.reports[alert_id].add_tx(tx_id, amount, days, orig, dest, orig_name, dest_name, attr) |
| tx_id += 1 |
|
|
| sar_accounts = list() |
| count = 0 |
| num_reports = len(self.reports) |
| for sar_id, typology in self.reports.items(): |
| if typology.count == 0: |
| continue |
| reason = typology.get_reason() |
| is_sar = "YES" if typology.is_sar else "NO" |
| for key, transaction in typology.transactions.items(): |
| amount, step, orig_acct, dest_acct, orig_name, dest_name, attr = transaction |
| |
| if (self.account_recorded(orig_acct) |
| and self.account_recorded(dest_acct)): |
| continue |
| if (not self.account_recorded(orig_acct)): |
| acct_id = orig_acct |
| cust_id = orig_name |
| typology.recorded_members.add(acct_id) |
| sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
| if (not self.account_recorded(dest_acct)): |
| acct_id = dest_acct |
| cust_id = dest_name |
| typology.recorded_members.add(acct_id) |
| sar_accounts.append((sar_id, acct_id, cust_id, days_to_date(step), reason, self.org_type(acct_id), is_sar)) |
| |
| count += 1 |
| if count % 100 == 0: |
| print("SAR Typologies: %d/%d" % (count, num_reports)) |
| return sar_accounts |
|
|
| def org_type(self, acct_id): |
| return "INDIVIDUAL" if self.org_types[acct_id] == "I" else "COMPANY" |
|
|
|
|
| def write_sar_accounts(self, writer, sar_accounts): |
| writer.writerow( |
| ["ALERT_ID", "ACCOUNT_ID", "CUSTOMER_ID", "EVENT_DATE", |
| "ALERT_TYPE", "ACCOUNT_TYPE", "IS_SAR"]) |
|
|
| for alert in sar_accounts: |
| writer.writerow(alert) |
|
|
| def account_recorded(self, acct_id): |
| for sar_id, typology in self.reports.items(): |
| if acct_id in typology.recorded_members: |
| return True |
| return False |
|
|
|
|
| if __name__ == "__main__": |
| argv = sys.argv |
|
|
| if len(argv) < 2: |
| print("Usage: python3 %s [ConfJSON]" % argv[0]) |
| exit(1) |
|
|
| _conf_json = argv[1] |
| _sim_name = argv[2] if len(argv) >= 3 else None |
|
|
| with open(_conf_json, "r") as rf: |
| conf = json.load(rf) |
| converter = LogConverter(conf, _sim_name) |
| fake = Faker(['en_US']) |
| Faker.seed(0) |
| converter = LogConverter(conf, _sim_name, fake) |
| converter.convert_alert_members() |
| converter.convert_acct_tx() |
| converter.output_sar_cases() |
|
|